diff --git a/src/dorkbox/util/messagebus/IMessageBus.java b/src/dorkbox/util/messagebus/IMessageBus.java index ab8b4c4..55ec8a4 100644 --- a/src/dorkbox/util/messagebus/IMessageBus.java +++ b/src/dorkbox/util/messagebus/IMessageBus.java @@ -112,23 +112,11 @@ public interface IMessageBus extends PubSubSupport { /** * Will publish to listeners with this exact message signature, as well as listeners that match the super class types signatures. - * and to listeners that have matching varity arguments. (ie: a listener that matches Obeject[], will accept messages of type Object) + * and to listeners that have matching varity arguments. (ie: a listener that matches Object[], will accept messages of type Object) */ - ExactWithSuperTypesAndVarArgs, + ExactWithSuperTypesAndVarity, } - enum SubscribeMode { - /** - * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) - */ - MultiArg, - /** - * Will subscribe and publish using only the FIRST provided parameter in the method signature (for subscribe), and arguments (for publish) - */ - FirstArg, - } - - /** * Check whether any asynchronous message publications are pending to be processed * diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index b5e01d6..4b6dc89 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -30,13 +30,9 @@ import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.publication.Publisher; -import dorkbox.util.messagebus.publication.PublisherAll_MultiArg; -import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_FirstArg; -import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_MultiArg; -import dorkbox.util.messagebus.publication.PublisherExact_FirstArg; -import dorkbox.util.messagebus.publication.PublisherExact_MultiArg; -import dorkbox.util.messagebus.subscription.FirstArgSubscriber; -import dorkbox.util.messagebus.subscription.MultiArgSubscriber; +import dorkbox.util.messagebus.publication.PublisherExact; +import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes; +import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity; import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.utils.ClassUtils; @@ -62,7 +58,7 @@ class MessageBus implements IMessageBus { private final ClassUtils classUtils; private final SubscriptionManager subscriptionManager; - private final Publisher subscriptionPublisher; + private final Publisher publisher; /** * Notifies the consumers during shutdown, that it's on purpose. @@ -74,30 +70,40 @@ class MessageBus implements IMessageBus { private Sequence workSequence; /** - * By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages + * By default, will permit subTypes and Varity Argument matching, and will use half of CPUs available for dispatching async messages */ public MessageBus() { - this(Runtime.getRuntime().availableProcessors()); + this(Runtime.getRuntime().availableProcessors()/2); } /** - * @param numberOfThreads how many threads to have for dispatching async messages + * By default, will permit subTypes and Varity Argument matching + * + * @param numberOfThreads how many threads to use for dispatching async messages */ public MessageBus(int numberOfThreads) { - this(PublishMode.ExactWithSuperTypes, SubscribeMode.FirstArg, numberOfThreads); -// this(PublishMode.ExactWithSuperTypes, SubscribeMode.MultiArg, numberOfThreads); + this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads); } /** + * By default, will use half of CPUs available for dispatching async messages + * * @param publishMode Specifies which publishMode to operate the publication of messages. - * @param numberOfThreads how many threads to have for dispatching async messages */ public - MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) { + MessageBus(final PublishMode publishMode) { + this(publishMode, Runtime.getRuntime().availableProcessors()/2); + } + /** + * @param publishMode Specifies which publishMode to operate the publication of messages. + * @param numberOfThreads how many threads to use for dispatching async messages + */ + public + MessageBus(final PublishMode publishMode, int numberOfThreads) { // round to the nearest power of 2 - numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1)); + numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads))); this.errorHandler = new DefaultErrorHandler(); // this.dispatchQueue = new ArrayBlockingQueue(6); @@ -106,43 +112,25 @@ class MessageBus implements IMessageBus { final StampedLock lock = new StampedLock(); - boolean isMultiArg = subscribeMode == SubscribeMode.MultiArg; final Subscriber subscriber; - if (isMultiArg) { - subscriber = new MultiArgSubscriber(errorHandler, classUtils); - } - else { - subscriber = new FirstArgSubscriber(errorHandler, classUtils); - } + /** + * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) + */ + subscriber = new Subscriber(errorHandler, classUtils); switch (publishMode) { case Exact: - if (isMultiArg) { - subscriptionPublisher = new PublisherExact_MultiArg(errorHandler, subscriber, lock); - } - else { - subscriptionPublisher = new PublisherExact_FirstArg(errorHandler, subscriber, lock); - } + publisher = new PublisherExact(errorHandler, subscriber, lock); break; case ExactWithSuperTypes: - if (isMultiArg) { - subscriptionPublisher = new PublisherExactWithSuperTypes_MultiArg(errorHandler, subscriber, lock); - } - else { - subscriptionPublisher = new PublisherExactWithSuperTypes_FirstArg(errorHandler, subscriber, lock); - } + publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber, lock); break; - case ExactWithSuperTypesAndVarArgs: + case ExactWithSuperTypesAndVarity: default: - if (isMultiArg) { - subscriptionPublisher = new PublisherAll_MultiArg(errorHandler, subscriber, lock); - } - else { - throw new RuntimeException("Unable to run in expected configuration"); - } + publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber, lock); } this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock); @@ -161,7 +149,7 @@ class MessageBus implements IMessageBus { // setup the work handlers handlers = new MessageHandler[numberOfThreads]; for (int i = 0; i < handlers.length; i++) { - handlers[i] = new MessageHandler(subscriptionPublisher); // exactly one per thread is used + handlers[i] = new MessageHandler(publisher); // exactly one per thread is used } @@ -336,25 +324,25 @@ class MessageBus implements IMessageBus { @Override public void publish(final Object message) { - subscriptionPublisher.publish(message); + publisher.publish(message); } @Override public void publish(final Object message1, final Object message2) { - subscriptionPublisher.publish(message1, message2); + publisher.publish(message1, message2); } @Override public void publish(final Object message1, final Object message2, final Object message3) { - subscriptionPublisher.publish(message1, message2, message3); + publisher.publish(message1, message2, message3); } @Override public void publish(final Object[] messages) { - subscriptionPublisher.publish(messages); + publisher.publish(messages); } @Override diff --git a/src/dorkbox/util/messagebus/common/MessageHandler.java b/src/dorkbox/util/messagebus/common/MessageHandler.java index d88f775..964f184 100644 --- a/src/dorkbox/util/messagebus/common/MessageHandler.java +++ b/src/dorkbox/util/messagebus/common/MessageHandler.java @@ -67,10 +67,10 @@ import java.util.Arrays; */ public class MessageHandler { - // publish all listeners defined by the given class (includes // listeners defined in super classes) public static +//cache this? MessageHandler[] get(final Class target) { // publish all handlers (this will include all (inherited) methods directly annotated using @Handler) diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java similarity index 97% rename from src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java rename to src/dorkbox/util/messagebus/publication/PublisherExact.java index 350f0d0..0981677 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java @@ -24,13 +24,13 @@ import dorkbox.util.messagebus.subscription.Subscription; @SuppressWarnings("Duplicates") public -class PublisherExact_MultiArg implements Publisher { +class PublisherExact implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; private final StampedLock lock; public - PublisherExact_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { this.errorHandler = errorHandler; this.subscriber = subscriber; this.lock = lock; diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java similarity index 97% rename from src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java rename to src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java index fc2db09..7748d02 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java @@ -24,13 +24,13 @@ import dorkbox.util.messagebus.subscription.Subscription; @SuppressWarnings("Duplicates") public -class PublisherExactWithSuperTypes_MultiArg implements Publisher { +class PublisherExactWithSuperTypes implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; private final StampedLock lock; public - PublisherExactWithSuperTypes_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { this.errorHandler = errorHandler; this.subscriber = subscriber; this.lock = lock; diff --git a/src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java similarity index 98% rename from src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java rename to src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java index fb90c33..d7030f7 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @SuppressWarnings("Duplicates") public -class PublisherAll_MultiArg implements Publisher { +class PublisherExactWithSuperTypesAndVarity implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; @@ -38,7 +38,7 @@ class PublisherAll_MultiArg implements Publisher { final VarArgUtils varArgUtils; public - PublisherAll_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { this.errorHandler = errorHandler; this.subscriber = subscriber; this.lock = lock; @@ -82,7 +82,7 @@ class PublisherAll_MultiArg implements Publisher { int length = varArgSubs.length; Object[] asArray = null; - if (length > 1) { + if (length > 0) { hasSubs = true; asArray = (Object[]) Array.newInstance(messageClass, 1); @@ -103,7 +103,7 @@ class PublisherAll_MultiArg implements Publisher { length = varArgSuperSubs.length; - if (length > 1) { + if (length > 0) { hasSubs = true; if (asArray == null) { diff --git a/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java b/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java deleted file mode 100644 index 61f1be5..0000000 --- a/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.subscription; - -import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.MessageHandler; -import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.utils.ClassUtils; -import dorkbox.util.messagebus.utils.SubscriptionUtils; -import dorkbox.util.messagebus.utils.VarArgUtils; - -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted - */ -public -class MultiArgSubscriber implements Subscriber { - - private final ErrorHandlingSupport errorHandler; - - private final SubscriptionUtils subUtils; - private final VarArgUtils varArgUtils; - - // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required - // this is the primary list for dispatching a specific message - // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, ArrayList> subscriptionsPerMessageSingle; - private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - - // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); - - public - MultiArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { - this.errorHandler = errorHandler; - - this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); - - this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR); - - // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(classUtils, Subscriber.LOAD_FACTOR); - } - - @Override - public - AtomicBoolean getVarArgPossibility() { - return varArgPossibility; - } - - @Override - public - VarArgUtils getVarArgUtils() { - return varArgUtils; - } - - @Override - public - void clear() { - this.subUtils.clear(); - this.varArgUtils.clear(); - } - - // inside a write lock - // add this subscription to each of the handled types - // to activate this sub for publication - private - void registerMulti(final Subscription subscription, final Class listenerClass, - final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, final AtomicBoolean varArgPossibility) { - - final MessageHandler handler = subscription.getHandler(); - final Class[] messageHandlerTypes = handler.getHandledMessages(); - final int size = messageHandlerTypes.length; - - final Class type0 = messageHandlerTypes[0]; - - switch (size) { - case 0: { - errorHandler.handleError("Error while trying to subscribe class", listenerClass); - return; - } - case 1: { - ArrayList subs = subsPerMessageSingle.get(type0); - if (subs == null) { - subs = new ArrayList(); - - // is this handler able to accept var args? - if (handler.getVarArgClass() != null) { - varArgPossibility.lazySet(true); - } - - subsPerMessageSingle.put(type0, subs); - } - - subs.add(subscription); - return; - } - case 2: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); - } - - subs.add(subscription); - return; - } - case 3: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); - } - - subs.add(subscription); - return; - } - default: { - ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, messageHandlerTypes); - } - - subs.add(subscription); - } - } - } - - @Override - public - void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { - - final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; - final AtomicBoolean varArgPossibility = this.varArgPossibility; - - Subscription subscription; - - for (int i = 0; i < handlersSize; i++) { - subscription = subsPerListener[i]; - - // activate this subscription for publication - // now add this subscription to each of the handled types - registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); - } - } - - @Override - public - void shutdown() { - this.subscriptionsPerMessageSingle.clear(); - this.subscriptionsPerMessageMulti.clear(); - - clear(); - } - - @Override - public - ArrayList getExactAsArray(final Class messageClass) { - return subscriptionsPerMessageSingle.get(messageClass); - } - - @Override - public - ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { - return subscriptionsPerMessageMulti.get(messageClass1, messageClass2); - } - - @Override - public - ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); - } - - // can return null - @Override - public - Subscription[] getExact(final Class messageClass) { - final ArrayList collection = getExactAsArray(messageClass); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - @Override - public - Subscription[] getExact(final Class messageClass1, final Class messageClass2) { - final ArrayList collection = getExactAsArray(messageClass1, messageClass2); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - @Override - public - Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - - final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - @Override - public - Subscription[] getExactAndSuper(final Class messageClass) { - ArrayList collection = getExactAsArray(messageClass); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - @Override - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { - ArrayList collection = getExactAsArray(messageClass1, messageClass2); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, - this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - @Override - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - - ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, - this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } -} diff --git a/src/dorkbox/util/messagebus/subscription/Subscriber.java b/src/dorkbox/util/messagebus/subscription/Subscriber.java index a4ad4a0..4687ada 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscriber.java +++ b/src/dorkbox/util/messagebus/subscription/Subscriber.java @@ -15,42 +15,325 @@ */ package dorkbox.util.messagebus.subscription; +import dorkbox.util.messagebus.common.HashMapTree; +import dorkbox.util.messagebus.common.MessageHandler; +import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.utils.ClassUtils; +import dorkbox.util.messagebus.utils.SubscriptionUtils; import dorkbox.util.messagebus.utils.VarArgUtils; import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +/** + * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted + */ public -interface Subscriber { - float LOAD_FACTOR = 0.8F; +class Subscriber { + public static final float LOAD_FACTOR = 0.8F; - AtomicBoolean getVarArgPossibility(); + private final ErrorHandlingSupport errorHandler; - VarArgUtils getVarArgUtils(); + private final SubscriptionUtils subUtils; + private final VarArgUtils varArgUtils; - void register(Class listenerClass, int handlersSize, Subscription[] subsPerListener); + // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required + // this is the primary list for dispatching a specific message + // write access is synchronized and happens only when a listener of a specific class is registered the first time + private final Map, ArrayList> subscriptionsPerMessageSingle; + private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - void shutdown(); + // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) + private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); - void clear(); - - ArrayList getExactAsArray(Class superClass); - - ArrayList getExactAsArray(Class superClass1, Class superClass2); - - ArrayList getExactAsArray(Class superClass1, Class superClass2, Class superClass3); + private ThreadLocal> listCache = new ThreadLocal>() { + @Override + protected + ArrayList initialValue() { + return new ArrayList(8); + } + }; - Subscription[] getExact(Class deadMessageClass); - Subscription[] getExact(Class messageClass1, Class messageClass2); + public + Subscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { + this.errorHandler = errorHandler; - Subscription[] getExact(Class messageClass1, Class messageClass2, Class messageClass3); + this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); + this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR); - Subscription[] getExactAndSuper(Class messageClass); + // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance of handlers + this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR); + } - Subscription[] getExactAndSuper(Class messageClass1, Class messageClass2); + public + AtomicBoolean getVarArgPossibility() { + return varArgPossibility; + } - Subscription[] getExactAndSuper(Class messageClass1, Class messageClass2, Class messageClass3); + public + VarArgUtils getVarArgUtils() { + return varArgUtils; + } + + public + void clear() { + this.subUtils.clear(); + this.varArgUtils.clear(); + } + + // inside a write lock + // add this subscription to each of the handled types + // to activate this sub for publication + private + void registerMulti(final Subscription subscription, final Class listenerClass, + final Map, ArrayList> subsPerMessageSingle, + final HashMapTree, ArrayList> subsPerMessageMulti, final AtomicBoolean varArgPossibility) { + + final MessageHandler handler = subscription.getHandler(); + final Class[] messageHandlerTypes = handler.getHandledMessages(); + final int size = messageHandlerTypes.length; + + final Class type0 = messageHandlerTypes[0]; + + switch (size) { + case 0: { + // TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method? + errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass); + return; + } + case 1: { + // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types + final ArrayList cachedSubs = listCache.get(); + ArrayList subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs); + if (subs == null) { + listCache.set(new ArrayList(8)); + subs = cachedSubs; + + // is this handler able to accept var args? + if (handler.getVarArgClass() != null) { + varArgPossibility.lazySet(true); + } + } + + subs.add(subscription); + return; + } + case 2: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); + } + + subs.add(subscription); + return; + } + case 3: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); + } + + subs.add(subscription); + return; + } + default: { + ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, messageHandlerTypes); + } + + subs.add(subscription); + } + } + } + + public + void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { + + final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + final AtomicBoolean varArgPossibility = this.varArgPossibility; + + Subscription subscription; + + for (int i = 0; i < handlersSize; i++) { + subscription = subsPerListener[i]; + + // activate this subscription for publication + // now add this subscription to each of the handled types + registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); + } + } + + public + void shutdown() { + this.subscriptionsPerMessageSingle.clear(); + this.subscriptionsPerMessageMulti.clear(); + + clear(); + } + + public + ArrayList getExactAsArray(final Class messageClass) { + return subscriptionsPerMessageSingle.get(messageClass); + } + + public + ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2); + } + + public + ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); + } + + // can return null + public + Subscription[] getExact(final Class messageClass) { + final ArrayList collection = getExactAsArray(messageClass); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExact(final Class messageClass1, final Class messageClass2) { + final ArrayList collection = getExactAsArray(messageClass1, messageClass2); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass) { + ArrayList collection = getExactAsArray(messageClass); // can return null + + // now publish superClasses + final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubscriptions.isEmpty()) { + collection.addAll(superSubscriptions); + } + } + else if (!superSubscriptions.isEmpty()) { + collection = superSubscriptions; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { + ArrayList collection = getExactAsArray(messageClass1, messageClass2); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } } diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index ebfceb1..b762abb 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -39,13 +39,14 @@ package dorkbox.util.messagebus.subscription; import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.util.messagebus.common.MessageHandler; -import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; /** @@ -78,10 +79,20 @@ class Subscription { public Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) { this.handlerMetadata = handler; - this.listeners = new StrongConcurrentSetV8(16, loadFactor, stripeSize); +// this.listeners = new StrongConcurrentSetV8(16, loadFactor, stripeSize); + + ///this is by far, the fastest + this.listeners = new ConcurrentSkipListSet<>(new Comparator() { + @Override + public + int compare(final Object o1, final Object o2) { + return Integer.compare(o1.hashCode(), o2.hashCode()); + } + }); // this.listeners = new StrongConcurrentSet(16, 0.85F); // this.listeners = new ConcurrentLinkedQueue2(); // this.listeners = new CopyOnWriteArrayList(); +// this.listeners = new CopyOnWriteArraySet(); // not very good IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); if (handler.isSynchronized()) { diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index a3c77db..caafe6a 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -130,7 +130,7 @@ class SubscriptionManager { // it was still null, so we actually have to create the rest of the subs if (subscriptions == null) { - subscriber.register(listenerClass, handlersSize, subsPerListener); + subscriber.register(listenerClass, handlersSize, subsPerListener); // this adds to subscriptionsPerMessage subsPerListenerMap.put(listenerClass, subsPerListener); lock.unlockWrite(stamp); diff --git a/test/dorkbox/util/messagebus/MultiMessageTest.java b/test/dorkbox/util/messagebus/MultiMessageTest.java index 76c8663..58690b1 100644 --- a/test/dorkbox/util/messagebus/MultiMessageTest.java +++ b/test/dorkbox/util/messagebus/MultiMessageTest.java @@ -19,7 +19,8 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSending() { - IMessageBus bus = new MessageBus(); + IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypesAndVarity, Runtime.getRuntime().availableProcessors() / 2); +// IMessageBus bus = new MessageBus(); // non varity mode bus.start(); MultiListener listener1 = new MultiListener(); @@ -48,6 +49,7 @@ public class MultiMessageTest extends MessageBusTest { count.set(0); + // test async publication bus.publishAsync("s"); bus.publishAsync("s", "s"); bus.publishAsync("s", "s", "s"); @@ -58,59 +60,7 @@ public class MultiMessageTest extends MessageBusTest { while (bus.hasPendingMessages()) { try { Thread.sleep(ConcurrentUnits); - } catch (InterruptedException e) { - } - } - - assertEquals(13, count.get()); - - - bus.shutdown(); - } - - @Test - public void testFirstArgMultiMessageSending() { - IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes, IMessageBus.SubscribeMode.FirstArg, - Runtime.getRuntime().availableProcessors() / 2); - bus.start(); - - FirstListener listener = new FirstListener(); - bus.subscribe(listener); - bus.unsubscribe(listener); - - bus.publish("s"); - bus.publish("s", "s"); - bus.publish("s", "s", "s"); - bus.publish(1, "s"); - bus.publish(1, 2, "s"); - bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); - - assertEquals(0, count.get()); - - bus.subscribe(listener); - - bus.publish("s"); // 4 - bus.publish("s", "s"); // 3 - bus.publish("s", "s", "s"); // 3 - bus.publish(1, "s"); // 1 - bus.publish(1, 2, "s"); // 2 - bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2 - - assertEquals(15, count.get()); - count.set(0); - - - bus.publishAsync("s"); - bus.publishAsync("s", "s"); - bus.publishAsync("s", "s", "s"); - bus.publish(1, "s"); - bus.publishAsync(1, 2, "s"); - bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6}); - - while (bus.hasPendingMessages()) { - try { - Thread.sleep(ConcurrentUnits); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } @@ -170,53 +120,4 @@ public class MultiMessageTest extends MessageBusTest { System.err.println("match Object[]"); } } - public static class FirstListener { - @Handler - public void handleSync(Object o) { - count.getAndIncrement(); - System.err.println("match Object"); - } - - @Handler - public void handleSync(String o1) { - count.getAndIncrement(); - System.err.println("match String"); - } - - @Handler - public void handleSync(String o1, String o2) { - count.getAndIncrement(); - System.err.println("match String, String"); - } - -// @Handler -// public void handleSync(String o1, String o2, String o3) { -// count.getAndIncrement(); -// System.err.println("match String, String, String"); -// } -// -// @Handler -// public void handleSync(Integer o1, Integer o2, String o3) { -// count.getAndIncrement(); -// System.err.println("match Integer, Integer, String"); -// } -// -// @Handler(acceptVarargs = true) -// public void handleSync(String... o) { -// count.getAndIncrement(); -// System.err.println("match String[]"); -// } -// -// @Handler -// public void handleSync(Integer... o) { -// count.getAndIncrement(); -// System.err.println("match Integer[]"); -// } -// -// @Handler(acceptVarargs = true) -// public void handleSync(Object... o) { -// count.getAndIncrement(); -// System.err.println("match Object[]"); -// } - } } diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 3598c35..248b386 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -28,7 +28,6 @@ import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.listeners.*; import dorkbox.util.messagebus.messages.*; -import dorkbox.util.messagebus.subscription.MultiArgSubscriber; import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.utils.ClassUtils; @@ -160,7 +159,7 @@ public class SubscriptionManagerTest extends AssertSupport { final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); final StampedLock lock = new StampedLock(); final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils); + final Subscriber subscriber = new Subscriber(errorHandler, classUtils); SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); @@ -184,7 +183,7 @@ public class SubscriptionManagerTest extends AssertSupport { final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); final StampedLock lock = new StampedLock(); final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils); + final Subscriber subscriber = new Subscriber(errorHandler, classUtils); final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); diff --git a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java index 53d4584..705e623 100644 --- a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -25,7 +25,13 @@ package dorkbox.util.messagebus.common; import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; -import java.util.*; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; /** * @author bennidi