From 4686461bc72e36bfdc91c30baaa05fb521fece4c Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 8 Feb 2016 02:12:08 +0100 Subject: [PATCH] Tweaked publication/dispatch and which threads (if async) get the subscriptions for the messages (previously, the calling thread did this. NOW whatever thread the dispatcher is in does this --- src/dorkbox/messagebus/IMessageBus.java | 2 +- src/dorkbox/messagebus/MessageBus.java | 46 ++--- .../Publisher.java => dispatch/Dispatch.java} | 12 +- .../messagebus/dispatch/DispatchExact.java | 153 +++++++++++++++ .../dispatch/DispatchExactWithSuperTypes.java | 179 ++++++++++++++++++ .../publication/PublisherExact.java | 66 ------- .../PublisherExactWithSuperTypes.java | 72 ------- .../messagebus/synchrony/AsyncABQ.java | 66 ++++--- .../messagebus/synchrony/AsyncABQ_noGc.java | 67 ++++--- .../messagebus/synchrony/AsyncDisruptor.java | 28 +-- .../messagebus/synchrony/MessageHolder.java | 5 +- src/dorkbox/messagebus/synchrony/Sync.java | 136 +------------ .../messagebus/synchrony/Synchrony.java | 8 +- .../synchrony/disruptor/MessageHandler.java | 16 +- .../util/messagebus/MultiMessageTest.java | 4 +- 15 files changed, 461 insertions(+), 399 deletions(-) rename src/dorkbox/messagebus/{publication/Publisher.java => dispatch/Dispatch.java} (62%) create mode 100644 src/dorkbox/messagebus/dispatch/DispatchExact.java create mode 100644 src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java delete mode 100644 src/dorkbox/messagebus/publication/PublisherExact.java delete mode 100644 src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java diff --git a/src/dorkbox/messagebus/IMessageBus.java b/src/dorkbox/messagebus/IMessageBus.java index 6ec3ff6..ac427f4 100644 --- a/src/dorkbox/messagebus/IMessageBus.java +++ b/src/dorkbox/messagebus/IMessageBus.java @@ -100,7 +100,7 @@ import dorkbox.messagebus.error.IPublicationErrorHandler; */ public interface IMessageBus extends PubSubSupport { - enum PublishMode { + enum DispatchMode { /** * Will only publish to listeners with this exact message signature. This is the fastest */ diff --git a/src/dorkbox/messagebus/MessageBus.java b/src/dorkbox/messagebus/MessageBus.java index fbdb335..db7bb9d 100644 --- a/src/dorkbox/messagebus/MessageBus.java +++ b/src/dorkbox/messagebus/MessageBus.java @@ -17,9 +17,9 @@ package dorkbox.messagebus; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.IPublicationErrorHandler; -import dorkbox.messagebus.publication.Publisher; -import dorkbox.messagebus.publication.PublisherExact; -import dorkbox.messagebus.publication.PublisherExactWithSuperTypes; +import dorkbox.messagebus.dispatch.Dispatch; +import dorkbox.messagebus.dispatch.DispatchExact; +import dorkbox.messagebus.dispatch.DispatchExactWithSuperTypes; import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.synchrony.AsyncABQ; import dorkbox.messagebus.synchrony.AsyncABQ_noGc; @@ -106,7 +106,7 @@ class MessageBus implements IMessageBus { private final SubscriptionManager subscriptionManager; - private final Publisher publisher; + private final Dispatch dispatch; private final Synchrony syncPublication; private final Synchrony asyncPublication; @@ -124,26 +124,26 @@ class MessageBus implements IMessageBus { * @param numberOfThreads how many threads to use for dispatching async messages */ public - MessageBus(int numberOfThreads) { - this(PublishMode.ExactWithSuperTypes, numberOfThreads); + MessageBus(final int numberOfThreads) { + this(DispatchMode.ExactWithSuperTypes, numberOfThreads); } /** * By default, will use half of CPUs available for dispatching async messages * - * @param publishMode Specifies which publishMode to operate the publication of messages. + * @param dispatchMode Specifies which publishMode to operate the publication of messages. */ public - MessageBus(final PublishMode publishMode) { - this(publishMode, Runtime.getRuntime().availableProcessors()); + MessageBus(final DispatchMode dispatchMode) { + this(dispatchMode, Runtime.getRuntime().availableProcessors()); } /** - * @param publishMode Specifies which publishMode to operate the publication of messages. + * @param dispatchMode Specifies which publishMode to operate the publication of messages. * @param numberOfThreads how many threads to use for dispatching async messages */ public - MessageBus(final PublishMode publishMode, int numberOfThreads) { + MessageBus(final DispatchMode dispatchMode, int numberOfThreads) { // round to the nearest power of 2 numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1)); @@ -154,22 +154,22 @@ class MessageBus implements IMessageBus { */ this.subscriptionManager = new SubscriptionManager(useStrongReferencesByDefault); - switch (publishMode) { + switch (dispatchMode) { case Exact: - publisher = new PublisherExact(subscriptionManager); + dispatch = new DispatchExact(errorHandler, subscriptionManager); break; case ExactWithSuperTypes: default: - publisher = new PublisherExactWithSuperTypes(errorHandler, subscriptionManager); + dispatch = new DispatchExactWithSuperTypes(errorHandler, subscriptionManager); break; } - syncPublication = new Sync(errorHandler, subscriptionManager); + syncPublication = new Sync(); // the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue if (useDisruptorForAsyncPublish) { - asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication); + asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication, subscriptionManager); } else { if (useZeroGarbageVersionOfABQ) { // no garbage is created, but this is slow (but faster than other messagebus implementations) @@ -239,8 +239,8 @@ class MessageBus implements IMessageBus { */ @Override public - void publish(final Object message) { - publisher.publish(syncPublication, message); + void publish(final Object message1) { + syncPublication.publish(dispatch, message1); } @@ -253,7 +253,7 @@ class MessageBus implements IMessageBus { @Override public void publish(final Object message1, final Object message2) { - publisher.publish(syncPublication, message1, message2); + syncPublication.publish(dispatch, message1, message2); } @@ -266,7 +266,7 @@ class MessageBus implements IMessageBus { @Override public void publish(final Object message1, final Object message2, final Object message3) { - publisher.publish(syncPublication, message1, message2, message3); + syncPublication.publish(dispatch, message1, message2, message3); } @@ -278,7 +278,7 @@ class MessageBus implements IMessageBus { @Override public void publishAsync(final Object message) { - publisher.publish(asyncPublication, message); + asyncPublication.publish(dispatch, message); } @@ -290,7 +290,7 @@ class MessageBus implements IMessageBus { @Override public void publishAsync(final Object message1, final Object message2) { - publisher.publish(asyncPublication, message1, message2); + asyncPublication.publish(dispatch, message1, message2); } @@ -302,7 +302,7 @@ class MessageBus implements IMessageBus { @Override public void publishAsync(final Object message1, final Object message2, final Object message3) { - publisher.publish(asyncPublication, message1, message2, message3); + asyncPublication.publish(dispatch, message1, message2, message3); } diff --git a/src/dorkbox/messagebus/publication/Publisher.java b/src/dorkbox/messagebus/dispatch/Dispatch.java similarity index 62% rename from src/dorkbox/messagebus/publication/Publisher.java rename to src/dorkbox/messagebus/dispatch/Dispatch.java index bf80e1a..dc77d4b 100644 --- a/src/dorkbox/messagebus/publication/Publisher.java +++ b/src/dorkbox/messagebus/dispatch/Dispatch.java @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.messagebus.publication; +package dorkbox.messagebus.dispatch; -import dorkbox.messagebus.synchrony.Synchrony; - -public interface Publisher { - void publish(final Synchrony synchrony, Object message1); - void publish(final Synchrony synchrony, Object message1, Object message2); - void publish(final Synchrony synchrony, Object message1, Object message2, Object message3); +public interface Dispatch { + void publish(Object message1); + void publish(Object message1, Object message2); + void publish(Object message1, Object message2, Object message3); } diff --git a/src/dorkbox/messagebus/dispatch/DispatchExact.java b/src/dorkbox/messagebus/dispatch/DispatchExact.java new file mode 100644 index 0000000..caf4252 --- /dev/null +++ b/src/dorkbox/messagebus/dispatch/DispatchExact.java @@ -0,0 +1,153 @@ +/* + * Copyright 2015 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.messagebus.dispatch; + +import dorkbox.messagebus.error.DeadMessage; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.subscription.SubscriptionManager; + +/** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + */ +@SuppressWarnings("Duplicates") +public +class DispatchExact implements Dispatch { + private final ErrorHandler errorHandler; + private final SubscriptionManager subManager; + + public + DispatchExact(final ErrorHandler errorHandler, final SubscriptionManager subManager) { + this.errorHandler = errorHandler; + this.subManager = subManager; + } + + @Override + public + void publish(final Object message1) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } + + @Override + public + void publish(final Object message1, final Object message2) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } + + @Override + public + void publish(final Object message1, final Object message2, final Object message3) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + final Class messageClass3 = message3.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } +} diff --git a/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java b/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java new file mode 100644 index 0000000..96a05a6 --- /dev/null +++ b/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java @@ -0,0 +1,179 @@ +/* + * Copyright 2015 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.messagebus.dispatch; + +import dorkbox.messagebus.error.DeadMessage; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.subscription.SubscriptionManager; + +@SuppressWarnings("Duplicates") +public +class DispatchExactWithSuperTypes implements Dispatch { + + private final ErrorHandler errorHandler; + private final SubscriptionManager subManager; + + public + DispatchExactWithSuperTypes(final ErrorHandler errorHandler, final SubscriptionManager subManager) { + this.errorHandler = errorHandler; + this.subManager = subManager; + } + + @Override + public + void publish(final Object message1) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } + + @Override + public + void publish(final Object message1, final Object message2) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } + + @Override + public + void publish(final Object message1, final Object message2, final Object message3) { + final ErrorHandler errorHandler = this.errorHandler; + final SubscriptionManager subManager = this.subManager; + + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + final Class messageClass3 = message3.getClass(); + + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null + + Subscription sub; + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } +} diff --git a/src/dorkbox/messagebus/publication/PublisherExact.java b/src/dorkbox/messagebus/publication/PublisherExact.java deleted file mode 100644 index 3d8fc74..0000000 --- a/src/dorkbox/messagebus/publication/PublisherExact.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.messagebus.publication; - -import dorkbox.messagebus.subscription.Subscription; -import dorkbox.messagebus.subscription.SubscriptionManager; -import dorkbox.messagebus.synchrony.Synchrony; - -/** - * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. - * - * The exception to this rule is when checking/calling DeadMessage publication. - */ -@SuppressWarnings("Duplicates") -public -class PublisherExact implements Publisher { - private final SubscriptionManager subManager; - - public - PublisherExact(final SubscriptionManager subManager) { - this.subManager = subManager; - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1) { - final Class messageClass = message1.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null - synchrony.publish(subscriptions, null, message1); - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1, final Object message2) { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null - synchrony.publish(subscriptions, null, message1, message2); - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - final Class messageClass3 = message3.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null - synchrony.publish(subscriptions, null, message1, message2, message3); - } -} diff --git a/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java deleted file mode 100644 index c4be025..0000000 --- a/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.messagebus.publication; - -import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.subscription.Subscription; -import dorkbox.messagebus.subscription.SubscriptionManager; -import dorkbox.messagebus.synchrony.Synchrony; - -@SuppressWarnings("Duplicates") -public -class PublisherExactWithSuperTypes implements Publisher { - - private final ErrorHandler errorHandler; - private final SubscriptionManager subManager; - - public - PublisherExactWithSuperTypes(final ErrorHandler errorHandler, final SubscriptionManager subManager) { - this.errorHandler = errorHandler; - this.subManager = subManager; - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1) { - final SubscriptionManager subManager = this.subManager; - final Class messageClass1 = message1.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null - - synchrony.publish(subscriptions, superSubscriptions, message1); - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1, final Object message2) { - final SubscriptionManager subManager = this.subManager; - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null - synchrony.publish(subscriptions, superSubscriptions, message1, message2); - } - - @Override - public - void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { - final SubscriptionManager subManager = this.subManager; - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - final Class messageClass3 = message3.getClass(); - - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null - synchrony.publish(subscriptions, superSubscriptions, message1, message2, message3); - } -} diff --git a/src/dorkbox/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/messagebus/synchrony/AsyncABQ.java index e581e7b..993191c 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncABQ.java +++ b/src/dorkbox/messagebus/synchrony/AsyncABQ.java @@ -18,7 +18,7 @@ package dorkbox.messagebus.synchrony; import dorkbox.messagebus.common.NamedThreadFactory; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; -import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.dispatch.Dispatch; import dorkbox.messagebus.synchrony.disruptor.MessageType; import java.util.ArrayDeque; @@ -70,7 +70,7 @@ class AsyncABQ implements Synchrony { final ErrorHandler errorHandler1 = errorHandler; while (!AsyncABQ.this.shuttingDown) { - process(IN_QUEUE, syncPublication1, errorHandler1); + process(IN_QUEUE, errorHandler1); } synchronized (shutdown) { @@ -92,12 +92,11 @@ class AsyncABQ implements Synchrony { @SuppressWarnings("Duplicates") private - void process(final ArrayBlockingQueue queue, final Synchrony sync, final ErrorHandler errorHandler) { + void process(final ArrayBlockingQueue queue, final ErrorHandler errorHandler) { MessageHolder event; int messageType = MessageType.ONE; - Subscription[] subs; - Subscription[] superSubs; + Dispatch dispatch; Object message1 = null; Object message2 = null; Object message3 = null; @@ -106,23 +105,22 @@ class AsyncABQ implements Synchrony { event = queue.take(); messageType = event.type; - subs = event.subs; - superSubs = event.superSubs; + dispatch = event.dispatch; message1 = event.message1; message2 = event.message2; message3 = event.message3; switch (messageType) { case MessageType.ONE: { - sync.publish(subs, superSubs, message1); + dispatch.publish(message1); return; } case MessageType.TWO: { - sync.publish(subs, superSubs, message1, message2); + dispatch.publish(message1, message2); return; } case MessageType.THREE: { - sync.publish(subs, superSubs, message1, message2, message3); + dispatch.publish(message1, message2, message3); //noinspection UnnecessaryReturnStatement return; } @@ -156,16 +154,16 @@ class AsyncABQ implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { - MessageHolder take = new MessageHolder(); + void publish(final Dispatch dispatch, final Object message1) { + MessageHolder job = new MessageHolder(); - take.type = MessageType.ONE; - take.subs = subscriptions; - take.superSubs = superSubscriptions; - take.message1 = message1; + job.type = MessageType.ONE; + job.dispatch = dispatch; + + job.message1 = message1; try { - this.dispatchQueue.put(take); + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) @@ -175,17 +173,17 @@ class AsyncABQ implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { - MessageHolder take = new MessageHolder(); + void publish(final Dispatch dispatch, final Object message1, final Object message2) { + MessageHolder job = new MessageHolder(); - take.type = MessageType.TWO; - take.subs = subscriptions; - take.superSubs = superSubscriptions; - take.message1 = message1; - take.message2 = message2; + job.type = MessageType.TWO; + job.dispatch = dispatch; + + job.message1 = message1; + job.message2 = message2; try { - this.dispatchQueue.put(take); + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) @@ -195,18 +193,18 @@ class AsyncABQ implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { - MessageHolder take = new MessageHolder(); + void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) { + MessageHolder job = new MessageHolder(); - take.type = MessageType.THREE; - take.subs = subscriptions; - take.superSubs = superSubscriptions; - take.message1 = message1; - take.message2 = message2; - take.message3 = message3; + job.type = MessageType.THREE; + job.dispatch = dispatch; + + job.message1 = message1; + job.message2 = message2; + job.message3 = message3; try { - this.dispatchQueue.put(take); + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) diff --git a/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java b/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java index 90f808b..03f4a7e 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java +++ b/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java @@ -18,7 +18,7 @@ package dorkbox.messagebus.synchrony; import dorkbox.messagebus.common.NamedThreadFactory; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; -import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.dispatch.Dispatch; import dorkbox.messagebus.synchrony.disruptor.MessageType; import java.util.ArrayDeque; @@ -82,7 +82,7 @@ class AsyncABQ_noGc implements Synchrony { final ErrorHandler errorHandler1 = errorHandler; while (!AsyncABQ_noGc.this.shuttingDown) { - process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1); + process(IN_QUEUE, OUT_QUEUE, errorHandler1); } synchronized (shutdown) { @@ -105,15 +105,12 @@ class AsyncABQ_noGc implements Synchrony { @SuppressWarnings("Duplicates") private void process(final ArrayBlockingQueue queue, - final ArrayBlockingQueue gcQueue, - final Synchrony sync, - final ErrorHandler errorHandler) { + final ArrayBlockingQueue gcQueue, final ErrorHandler errorHandler) { MessageHolder event; int messageType = MessageType.ONE; - Subscription[] subs; - Subscription[] superSubs; + Dispatch dispatch; Object message1 = null; Object message2 = null; Object message3 = null; @@ -122,8 +119,7 @@ class AsyncABQ_noGc implements Synchrony { event = queue.take(); messageType = event.type; - subs = event.subs; - superSubs = event.superSubs; + dispatch = event.dispatch; message1 = event.message1; message2 = event.message2; message3 = event.message3; @@ -132,15 +128,15 @@ class AsyncABQ_noGc implements Synchrony { switch (messageType) { case MessageType.ONE: { - sync.publish(subs, superSubs, message1); + dispatch.publish(message1); return; } case MessageType.TWO: { - sync.publish(subs, superSubs, message1, message2); + dispatch.publish(message1, message2); return; } case MessageType.THREE: { - sync.publish(subs, superSubs, message1, message2, message3); + dispatch.publish(message1, message2, message3); //noinspection UnnecessaryReturnStatement return; } @@ -174,15 +170,16 @@ class AsyncABQ_noGc implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { + void publish(final Dispatch dispatch, final Object message1) { try { - MessageHolder take = gcQueue.take(); + MessageHolder job = gcQueue.take(); - take.type = MessageType.ONE; - take.subs = subscriptions; - take.message1 = message1; + job.type = MessageType.ONE; + job.dispatch = dispatch; - this.dispatchQueue.put(take); + job.message1 = message1; + + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) @@ -192,17 +189,17 @@ class AsyncABQ_noGc implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { + void publish(final Dispatch dispatch, final Object message1, final Object message2) { try { - MessageHolder take = gcQueue.take(); + MessageHolder job = gcQueue.take(); - take.type = MessageType.TWO; - take.subs = subscriptions; - take.superSubs = superSubscriptions; - take.message1 = message1; - take.message2 = message2; + job.type = MessageType.TWO; + job.dispatch = dispatch; - this.dispatchQueue.put(take); + job.message1 = message1; + job.message2 = message2; + + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) @@ -212,18 +209,18 @@ class AsyncABQ_noGc implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { + void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) { try { - MessageHolder take = gcQueue.take(); + MessageHolder job = gcQueue.take(); - take.type = MessageType.THREE; - take.subs = subscriptions; - take.superSubs = superSubscriptions; - take.message1 = message1; - take.message2 = message2; - take.message3 = message3; + job.type = MessageType.THREE; + job.dispatch = dispatch; - this.dispatchQueue.put(take); + job.message1 = message1; + job.message2 = message2; + job.message3 = message3; + + this.dispatchQueue.put(job); } catch (InterruptedException e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") .setCause(e) diff --git a/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java index b791e41..f4b951f 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java +++ b/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java @@ -25,7 +25,8 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkProcessor; import dorkbox.messagebus.common.NamedThreadFactory; import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.dispatch.Dispatch; +import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.synchrony.disruptor.EventBusFactory; import dorkbox.messagebus.synchrony.disruptor.MessageHandler; import dorkbox.messagebus.synchrony.disruptor.MessageType; @@ -53,7 +54,7 @@ class AsyncDisruptor implements Synchrony { private final Sequence workSequence; public - AsyncDisruptor(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) { + AsyncDisruptor(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication, final SubscriptionManager subManager) { // Now we setup the disruptor and work handlers ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, @@ -67,7 +68,7 @@ class AsyncDisruptor implements Synchrony { // setup the work handlers handlers = new MessageHandler[numberOfThreads]; for (int i = 0; i < handlers.length; i++) { - handlers[i] = new MessageHandler(syncPublication); // exactly one per thread is used + handlers[i] = new MessageHandler(); // exactly one per thread is used } @@ -126,13 +127,14 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { + void publish(final Dispatch dispatch, final Object message1) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.ONE; - job.subs = subscriptions; - job.superSubs = superSubscriptions; + job.dispatch = dispatch; + job.message1 = message1; ringBuffer.publish(seq); @@ -140,13 +142,14 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { + void publish(final Dispatch dispatch, final Object message1, final Object message2) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.TWO; - job.subs = subscriptions; - job.superSubs = superSubscriptions; + job.dispatch = dispatch; + job.message1 = message1; job.message2 = message2; @@ -155,13 +158,14 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { + void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.THREE; - job.subs = subscriptions; - job.superSubs = superSubscriptions; + job.dispatch = dispatch; + job.message1 = message1; job.message3 = message2; job.message2 = message3; diff --git a/src/dorkbox/messagebus/synchrony/MessageHolder.java b/src/dorkbox/messagebus/synchrony/MessageHolder.java index 450ecf4..9cc2dbe 100644 --- a/src/dorkbox/messagebus/synchrony/MessageHolder.java +++ b/src/dorkbox/messagebus/synchrony/MessageHolder.java @@ -15,7 +15,7 @@ */ package dorkbox.messagebus.synchrony; -import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.dispatch.Dispatch; import dorkbox.messagebus.synchrony.disruptor.MessageType; /** @@ -24,8 +24,7 @@ import dorkbox.messagebus.synchrony.disruptor.MessageType; public class MessageHolder { public int type = MessageType.ONE; - public Subscription[] subs; - public Subscription[] superSubs; + public Dispatch dispatch = null; public Object message1 = null; public Object message2 = null; diff --git a/src/dorkbox/messagebus/synchrony/Sync.java b/src/dorkbox/messagebus/synchrony/Sync.java index cc5b7d2..0b422ba 100644 --- a/src/dorkbox/messagebus/synchrony/Sync.java +++ b/src/dorkbox/messagebus/synchrony/Sync.java @@ -15,155 +15,35 @@ */ package dorkbox.messagebus.synchrony; -import dorkbox.messagebus.error.DeadMessage; -import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.subscription.Subscription; -import dorkbox.messagebus.subscription.SubscriptionManager; +import dorkbox.messagebus.dispatch.Dispatch; /** - * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. - * - * The exception to this rule is when checking/calling DeadMessage publication. - * * @author dorkbox, llc Date: 2/2/15 */ @SuppressWarnings("Duplicates") public final class Sync implements Synchrony { - private final ErrorHandler errorHandler; - private final SubscriptionManager subManager; - public - Sync(final ErrorHandler errorHandler, final SubscriptionManager subManager) { - this.errorHandler = errorHandler; - this.subManager = subManager; + Sync() { } public - void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { - final ErrorHandler errorHandler = this.errorHandler; - - Subscription sub; - int subLength; - boolean hasSubs = false; - - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1); - } - } - - // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) - if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); - } - } - } + void publish(final Dispatch dispatch, final Object message1) { + dispatch.publish(message1); } @Override public - void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2) { - final ErrorHandler errorHandler = this.errorHandler; - - Subscription sub; - int subLength; - boolean hasSubs = false; - - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2); - } - } - - // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) - if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); - } - } - } + void publish(final Dispatch dispatch, final Object message1, final Object message2) { + dispatch.publish(message1, message2); } @Override public - void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { - final ErrorHandler errorHandler = this.errorHandler; - - Subscription sub; - int subLength; - boolean hasSubs = false; - - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2, message3); - } - } - - // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) - if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2, message3); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); - } - } - } + void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) { + dispatch.publish(message1, message2, message3); } @Override diff --git a/src/dorkbox/messagebus/synchrony/Synchrony.java b/src/dorkbox/messagebus/synchrony/Synchrony.java index 134d98f..086bf4f 100644 --- a/src/dorkbox/messagebus/synchrony/Synchrony.java +++ b/src/dorkbox/messagebus/synchrony/Synchrony.java @@ -15,16 +15,16 @@ */ package dorkbox.messagebus.synchrony; -import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.dispatch.Dispatch; /** * @author dorkbox, llc Date: 2/3/16 */ public interface Synchrony { - void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1); - void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2); - void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2, Object message3); + void publish(Dispatch dispatch, Object message1); + void publish(Dispatch dispatch, Object message1, Object message2); + void publish(Dispatch dispatch, Object message1, Object message2, Object message3); void shutdown(); boolean hasPendingMessages(); diff --git a/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java b/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java index 9203ea3..e383c87 100644 --- a/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java +++ b/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java @@ -17,9 +17,7 @@ package dorkbox.messagebus.synchrony.disruptor; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.WorkHandler; -import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.synchrony.MessageHolder; -import dorkbox.messagebus.synchrony.Synchrony; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,34 +27,28 @@ import java.util.concurrent.atomic.AtomicBoolean; public class MessageHandler implements WorkHandler, LifecycleAware { - private final Synchrony syncPublication; - private final AtomicBoolean shutdown = new AtomicBoolean(false); public - MessageHandler(final Synchrony syncPublication) { - this.syncPublication = syncPublication; + MessageHandler() { } - @SuppressWarnings("Duplicates") @Override public void onEvent(final MessageHolder event) throws Exception { final int messageType = event.type; - final Subscription[] subs = event.subs; - final Subscription[] superSubs = event.superSubs; switch (messageType) { case MessageType.ONE: { - syncPublication.publish(subs, superSubs, event.message1); + event.dispatch.publish(event.message1); return; } case MessageType.TWO: { - syncPublication.publish(subs, superSubs, event.message1, event.message2); + event.dispatch.publish(event.message1, event.message2); return; } case MessageType.THREE: { - syncPublication.publish(subs, superSubs, event.message1, event.message2, event.message3); + event.dispatch.publish(event.message1, event.message2, event.message3); //noinspection UnnecessaryReturnStatement return; } diff --git a/test/dorkbox/util/messagebus/MultiMessageTest.java b/test/dorkbox/util/messagebus/MultiMessageTest.java index d4ee9cc..56e5b45 100644 --- a/test/dorkbox/util/messagebus/MultiMessageTest.java +++ b/test/dorkbox/util/messagebus/MultiMessageTest.java @@ -22,7 +22,7 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSendingExact() { - IMessageBus bus = new MessageBus(IMessageBus.PublishMode.Exact, + IMessageBus bus = new MessageBus(IMessageBus.DispatchMode.Exact, Runtime.getRuntime() .availableProcessors() / 2); MultiListener listener1 = new MultiListener(); @@ -53,7 +53,7 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSendingExactAndSuper() { - IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes, + IMessageBus bus = new MessageBus(IMessageBus.DispatchMode.ExactWithSuperTypes, Runtime.getRuntime() .availableProcessors() / 2); MultiListener listener1 = new MultiListener();