From 1287612685f6eca4fb3f19a8b553beff6d6e0f68 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 7 Jun 2015 23:25:14 +0200 Subject: [PATCH] Added back var arg 2/3, vararg super 2/3. Added unit test for this. --- .../dorkbox/util/messagebus/IMessageBus.java | 6 +- .../util/messagebus/common/ClassUtils.java | 183 ++++++++++ .../util/messagebus/common/HashMapTree.java | 2 +- .../messagebus/common/MessageHandler.java | 15 +- .../messagebus/common/SuperClassUtils.java | 96 ------ .../util/messagebus/common/VarArgUtils.java | 124 +++---- .../messagebus/subscription/Subscription.java | 37 +-- .../subscription/SubscriptionManager.java | 314 ++++++++++-------- .../subscription/SubscriptionUtils.java | 60 ++-- .../util/messagebus/AsyncFIFOBusTest.java | 12 +- .../util/messagebus/MBassadorTest.java | 17 +- .../util/messagebus/MultiMessageTest.java | 12 +- .../util/messagebus/ObjectTreeTest.java | 5 +- .../dorkbox/util/messagebus/SyncBusTest.java | 13 +- .../messagebus/common/MessageBusTest.java | 8 +- 15 files changed, 475 insertions(+), 429 deletions(-) create mode 100644 src/main/java/dorkbox/util/messagebus/common/ClassUtils.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java diff --git a/src/main/java/dorkbox/util/messagebus/IMessageBus.java b/src/main/java/dorkbox/util/messagebus/IMessageBus.java index 5888e4c..200d701 100644 --- a/src/main/java/dorkbox/util/messagebus/IMessageBus.java +++ b/src/main/java/dorkbox/util/messagebus/IMessageBus.java @@ -4,7 +4,11 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; /** * A message bus offers facilities for publishing messages to the message handlers of registered listeners. - * A message publication starts when an object is send to the bus using one of the its publication methods. + *

+ * + * Because the message bus keeps track of classes that are subscribed and published, reloading the classloader means that you will need to + * SHUTDOWN the messagebus when you unload the classloader, and then re-subscribe relevant classes when you reload the classes. + *

* * Messages can be published synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T. * Message handlers can be invoked synchronously or asynchronously depending on their configuration. Thus, there diff --git a/src/main/java/dorkbox/util/messagebus/common/ClassUtils.java b/src/main/java/dorkbox/util/messagebus/common/ClassUtils.java new file mode 100644 index 0000000..f5cbc17 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/ClassUtils.java @@ -0,0 +1,183 @@ +package dorkbox.util.messagebus.common; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Map; + +public final class ClassUtils { + + private final Map, Class> arrayCache; + private final Map, Class[]> superClassesCache; + + public ClassUtils(final float loadFactor) { + this.arrayCache = new ConcurrentHashMapV8, Class>(32, loadFactor, 1); + this.superClassesCache = new ConcurrentHashMapV8, Class[]>(32, loadFactor, 1); + } + + /** + * never returns null + * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) + *

+ * if parameter clazz is of type array, then the super classes are of array type as well + *

+ * protected by read lock by caller. The cache version is called first, by write lock + */ + public Class[] getSuperClasses(final Class clazz) { + // this is never reset, since it never needs to be. + final Map, Class[]> local = this.superClassesCache; + + Class[] classes = local.get(clazz); + + if (classes == null) { + // publish all super types of class + final Class[] superTypes = ReflectionUtils.getSuperTypes(clazz); + final int length = superTypes.length; + + final ArrayList> newList = new ArrayList>(length); + + Class c; + final boolean isArray = clazz.isArray(); + + if (isArray) { + for (int i = 0; i < length; i++) { + c = superTypes[i]; + + c = getArrayClass(c); + + if (c != clazz) { + newList.add(c); + } + } + } + else { + for (int i = 0; i < length; i++) { + c = superTypes[i]; + + if (c != clazz) { + newList.add(c); + } + } + } + + classes = new Class[newList.size()]; + newList.toArray(classes); + local.put(clazz, classes); + } + + return classes; + } + + /** + * race conditions will result in duplicate answers, which we don't care if happens + * never returns null + * never reset + */ + public Class getArrayClass(final Class c) { + final Map, Class> arrayCache = this.arrayCache; + Class clazz = arrayCache.get(c); + + if (clazz == null) { + // messy, but the ONLY way to do it. Array super types are also arrays + final Object[] newInstance = (Object[]) Array.newInstance(c, 1); + clazz = newInstance.getClass(); + arrayCache.put(c, clazz); + } + + return clazz; + } + + + /** + * Clears the caches + */ + public void clear() { + this.arrayCache.clear(); + this.superClassesCache.clear(); + } + + public static ArrayList findCommon(final T[] arrayOne, final T[] arrayTwo) { + + T[] arrayToHash; + T[] arrayToSearch; + + final int size1 = arrayOne.length; + final int size2 = arrayTwo.length; + + final int hashSize; + final int searchSize; + + if (size1 < size2) { + hashSize = size1; + searchSize = size2; + arrayToHash = arrayOne; + arrayToSearch = arrayTwo; + } + else { + hashSize = size2; + searchSize = size1; + arrayToHash = arrayTwo; + arrayToSearch = arrayOne; + } + + + final ArrayList intersection = new ArrayList(searchSize); + + final HashSet hashedArray = new HashSet(); + for (int i = 0; i < hashSize; i++) { + T t = arrayToHash[i]; + hashedArray.add(t); + } + + for (int i = 0; i < searchSize; i++) { + T t = arrayToSearch[i]; + if (hashedArray.contains(t)) { + intersection.add(t); + } + } + + return intersection; + } + + public static ArrayList findCommon(final ArrayList arrayOne, final ArrayList arrayTwo) { + + ArrayList arrayToHash; + ArrayList arrayToSearch; + + final int size1 = arrayOne.size(); + final int size2 = arrayTwo.size(); + + final int hashSize; + final int searchSize; + + if (size1 < size2) { + hashSize = size1; + searchSize = size2; + arrayToHash = arrayOne; + arrayToSearch = arrayTwo; + } + else { + hashSize = size2; + searchSize = size1; + arrayToHash = arrayTwo; + arrayToSearch = arrayOne; + } + + ArrayList intersection = new ArrayList(searchSize); + + HashSet hashedArray = new HashSet(); + for (int i = 0; i < hashSize; i++) { + T t = arrayToHash.get(i); + hashedArray.add(t); + } + + for (int i = 0; i < searchSize; i++) { + T t = arrayToSearch.get(i); + if (hashedArray.contains(t)) { + intersection.add(t); + } + } + + return intersection; + } +} diff --git a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java index 402f49a..445a1f9 100644 --- a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java +++ b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java @@ -328,7 +328,7 @@ public class HashMapTree { return tree.value; } - public final VALUE getValue(KEY key1, KEY key2, KEY key3) { + public final VALUE get(KEY key1, KEY key2, KEY key3) { HashMapTree tree; // publish value from our children tree = getLeaf(key1); diff --git a/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java index aa082da..d10065c 100644 --- a/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java +++ b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java @@ -74,7 +74,7 @@ public class MessageHandler { private final Class[] handledMessages; private final boolean acceptsSubtypes; - private final boolean acceptsVarArgs; + private final Class varArgClass; private final boolean isSynchronized; @@ -94,7 +94,12 @@ public class MessageHandler { this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null; this.handledMessages = handledMessages; - this.acceptsVarArgs = handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs(); + if (handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs()) { + this.varArgClass = handledMessages[0].getComponentType(); + } + else { + this.varArgClass = null; + } } public final boolean isSynchronized() { @@ -113,12 +118,16 @@ public class MessageHandler { return this.handledMessages; } + public final Class getVarArgClass() { + return this.varArgClass; + } + public final boolean acceptsSubtypes() { return this.acceptsSubtypes; } public final boolean acceptsVarArgs() { - return this.acceptsVarArgs; + return this.varArgClass != null; } @Override diff --git a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java b/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java deleted file mode 100644 index 12d8a3e..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java +++ /dev/null @@ -1,96 +0,0 @@ -package dorkbox.util.messagebus.common; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Map; - -public class SuperClassUtils { - - private final Map, Class> versionCache; - private final Map, Class[]> superClassesCache; - - public SuperClassUtils(float loadFactor, int stripeSize) { - this.versionCache = new ConcurrentHashMapV8, Class>(32, loadFactor, stripeSize); - this.superClassesCache = new ConcurrentHashMapV8, Class[]>(32, loadFactor, stripeSize); - } - - /** - * never returns null - * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) - *

- * if parameter clazz is of type array, then the super classes are of array type as well - *

- * protected by read lock by caller. The cache version is called first, by write lock - */ - public final Class[] getSuperClasses(final Class clazz) { - // this is never reset, since it never needs to be. - final Map, Class[]> local = this.superClassesCache; - - Class[] classes = local.get(clazz); - - if (classes == null) { - // publish all super types of class - final Class[] superTypes = ReflectionUtils.getSuperTypes(clazz); - final int length = superTypes.length; - - ArrayList> newList = new ArrayList>(length); - - Class c; - final boolean isArray = clazz.isArray(); - - if (isArray) { - for (int i = 0; i < length; i++) { - c = superTypes[i]; - - c = getArrayClass(c); - - if (c != clazz) { - newList.add(c); - } - } - } - else { - for (int i = 0; i < length; i++) { - c = superTypes[i]; - - if (c != clazz) { - newList.add(c); - } - } - } - - classes = new Class[newList.size()]; - newList.toArray(classes); - local.put(clazz, classes); - } - - return classes; - } - - /** - * race conditions will result in duplicate answers, which we don't care if happens - * never returns null - * never reset - */ - public final Class getArrayClass(final Class c) { - final Map, Class> versionCache = this.versionCache; - Class clazz = versionCache.get(c); - - if (clazz == null) { - // messy, but the ONLY way to do it. Array super types are also arrays - final Object[] newInstance = (Object[]) Array.newInstance(c, 1); - clazz = newInstance.getClass(); - versionCache.put(c, clazz); - } - - return clazz; - } - - /** - * Clears the caches on shutdown - */ - public final void shutdown() { - this.versionCache.clear(); - this.superClassesCache.clear(); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index 7d5a042..a4d9e5f 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -1,57 +1,50 @@ package dorkbox.util.messagebus.common; -import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionUtils; import java.util.ArrayList; import java.util.Map; public final class VarArgUtils { - private final Map, ArrayList> varArgSubscriptions; - private final Map, ArrayList> varArgSuperClassSubscriptions; - private final HashMapTree, ArrayList> varArgSuperClassSubscriptionsMulti; + private final Map, ArrayList> varArgSubscriptionsSingle; + private final HashMapTree, ArrayList> varArgSubscriptionsMulti; - private final SubscriptionHolder subHolderConcurrent; + private final Map, ArrayList> varArgSuperSubscriptionsSingle; + private final HashMapTree, ArrayList> varArgSuperSubscriptionsMulti; - private final float loadFactor; - private final int stripeSize; - - private final SubscriptionUtils utils; - private final SuperClassUtils superClassUtils; + private final ClassUtils superClassUtils; private final Map, ArrayList> subscriptionsPerMessageSingle; - public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils, - Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) { + public VarArgUtils(final ClassUtils superClassUtils, final Map, ArrayList> subscriptionsPerMessageSingle, + final float loadFactor, final int stripeSize) { - this.utils = utils; this.superClassUtils = superClassUtils; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; - this.loadFactor = loadFactor; - this.stripeSize = stripeSize; - this.varArgSubscriptions = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); - this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); - this.varArgSuperClassSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); + this.varArgSubscriptionsSingle = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); + this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); - this.subHolderConcurrent = new SubscriptionHolder(); + this.varArgSuperSubscriptionsSingle = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); + this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); } public void clear() { - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); - this.varArgSuperClassSubscriptionsMulti.clear(); + this.varArgSubscriptionsSingle.clear(); + this.varArgSubscriptionsMulti.clear(); + + this.varArgSuperSubscriptionsSingle.clear(); + this.varArgSuperSubscriptionsMulti.clear(); } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions - public Subscription[] getVarArgSubscriptions(Class messageClass) { + public Subscription[] getVarArgSubscriptions(final Class messageClass) { // whenever our subscriptions change, this map is cleared. - final Map, ArrayList> local = this.varArgSubscriptions; + final Map, ArrayList> local = this.varArgSubscriptionsSingle; ArrayList varArgSubs = local.get(messageClass); @@ -68,7 +61,7 @@ public final class VarArgUtils { for (int i = 0; i < length; i++) { sub = subs.get(i); - if (sub.acceptsVarArgs()) { + if (sub.getHandler().acceptsVarArgs()) { varArgSubs.add(sub); } } @@ -84,6 +77,7 @@ public final class VarArgUtils { + // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions @@ -95,9 +89,10 @@ public final class VarArgUtils { return subscriptions; } + // CAN NOT RETURN NULL private ArrayList getVarArgSuperSubscriptions_List(final Class messageClass) { // whenever our subscriptions change, this map is cleared. - final Map, ArrayList> local = this.varArgSuperClassSubscriptions; + final Map, ArrayList> local = this.varArgSuperSubscriptionsSingle; ArrayList varArgSuperSubs = local.get(messageClass); @@ -119,6 +114,7 @@ public final class VarArgUtils { Subscription sub; ArrayList subs; int length; + MessageHandler handlerMetadata; for (int i = 0; i < typesLength; i++) { type = types[i]; @@ -131,7 +127,8 @@ public final class VarArgUtils { for (int j = 0; j < length; j++) { sub = subs.get(j); - if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { + handlerMetadata = sub.getHandler(); + if (handlerMetadata.acceptsSubtypes() && handlerMetadata.acceptsVarArgs()) { varArgSuperSubs.add(sub); } } @@ -151,42 +148,24 @@ public final class VarArgUtils { // and then, returns the array'd version superclass subscriptions public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2) { // whenever our subscriptions change, this map is cleared. - final HashMapTree, ArrayList> local = this.varArgSuperClassSubscriptionsMulti; + final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; - HashMapTree, ArrayList> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); - ArrayList subsPerType; + ArrayList subs = local.get(messageClass1, messageClass2); - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } - else { + if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); - final int size1 = varargSuperSubscriptions1.size(); - final int size2 = varargSuperSubscriptions2.size(); + subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); - subsPerType = new ArrayList(size1 + size2); - - Subscription sub; - for (int i = 0; i < size1; i++) { - sub = varargSuperSubscriptions1.get(i); - - if (varargSuperSubscriptions2.contains(sub)) { - subsPerType.add(sub); - } - } - - subsPerType.trimToSize(); - - local.put(subsPerType, messageClass1, messageClass2); + subs.trimToSize(); + local.put(subs, messageClass1, messageClass2); } - final Subscription[] subscriptions = new Subscription[subsPerType.size()]; - subsPerType.toArray(subscriptions); + final Subscription[] subscriptions = new Subscription[subs.size()]; + subs.toArray(subscriptions); return subscriptions; } @@ -197,43 +176,28 @@ public final class VarArgUtils { public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { // whenever our subscriptions change, this map is cleared. - final HashMapTree, ArrayList> local = this.varArgSuperClassSubscriptionsMulti; + final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; - HashMapTree, ArrayList> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); - ArrayList subsPerType; + ArrayList subs = local.get(messageClass1, messageClass2, messageClass3); - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } - else { + if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3); - final int size1 = varargSuperSubscriptions1.size(); - final int size2 = varargSuperSubscriptions2.size(); + subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); + subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3); - subsPerType = new ArrayList(size1 + size2); - - Subscription sub; - for (int i = 0; i < size1; i++) { - sub = varargSuperSubscriptions1.get(i); - - if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { - subsPerType.add(sub); - } - } - - subsPerType.trimToSize(); - - local.put(subsPerType, messageClass1, messageClass2, messageClass3); + subs.trimToSize(); + local.put(subs, messageClass1, messageClass2, messageClass3); } - final Subscription[] subscriptions = new Subscription[subsPerType.size()]; - subsPerType.toArray(subscriptions); + final Subscription[] subscriptions = new Subscription[subs.size()]; + subs.toArray(subscriptions); return subscriptions; } + + } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 4ff9105..6bd22ab 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author dorkbox, llc * Date: 2/2/15 */ -public class Subscription { +public final class Subscription { private static final AtomicInteger ID_COUNTER = new AtomicInteger(); public final int ID = ID_COUNTER.getAndIncrement(); @@ -56,43 +56,31 @@ public class Subscription { this.invocation = invocation; } - public final MessageHandler getHandlerMetadata() { + public MessageHandler getHandler() { return handlerMetadata; } - public final Class[] getHandledMessageTypes() { - return this.handlerMetadata.getHandledMessages(); - } - - public final boolean acceptsSubtypes() { - return this.handlerMetadata.acceptsSubtypes(); - } - - public final boolean acceptsVarArgs() { - return this.handlerMetadata.acceptsVarArgs(); - } - - public final boolean isEmpty() { + public boolean isEmpty() { return this.listeners.isEmpty(); } - public final void subscribe(Object listener) { + public void subscribe(Object listener) { this.listeners.add(listener); } /** * @return TRUE if the element was removed */ - public final boolean unsubscribe(Object existingListener) { + public boolean unsubscribe(Object existingListener) { return this.listeners.remove(existingListener); } // only used in unit-test - public final int size() { + public int size() { return this.listeners.size(); } - public final void publish(final Object message) throws Throwable { + public void publish(final Object message) throws Throwable { final MethodAccess handler = this.handlerMetadata.getHandler(); final int handleIndex = this.handlerMetadata.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; @@ -107,7 +95,7 @@ public class Subscription { } } - public final void publish(final Object message1, final Object message2) throws Throwable { + public void publish(final Object message1, final Object message2) throws Throwable { final MethodAccess handler = this.handlerMetadata.getHandler(); final int handleIndex = this.handlerMetadata.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; @@ -122,7 +110,7 @@ public class Subscription { } } - public final void publish(final Object message1, final Object message2, final Object message3) throws Throwable { + public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { final MethodAccess handler = this.handlerMetadata.getHandler(); final int handleIndex = this.handlerMetadata.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; @@ -137,7 +125,7 @@ public class Subscription { } } - public final void publishToSubscription(final Object... messages) throws Throwable { + public void publishToSubscription(final Object... messages) throws Throwable { final MethodAccess handler = this.handlerMetadata.getHandler(); final int handleIndex = this.handlerMetadata.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; @@ -154,12 +142,12 @@ public class Subscription { @Override - public final int hashCode() { + public int hashCode() { return this.ID; } @Override - public final boolean equals(Object obj) { + public boolean equals(Object obj) { if (this == obj) { return true; } @@ -233,7 +221,6 @@ public class Subscription { } subs.add(this); - return; } } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 3b69f32..60d03c6 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -29,7 +29,6 @@ import java.util.concurrent.locks.StampedLock; public final class SubscriptionManager { private static final float LOAD_FACTOR = 0.8F; - private final SubscriptionUtils utils; // remember already processed classes that do not contain any message handlers private final Map, Boolean> nonListeners; @@ -49,7 +48,8 @@ public final class SubscriptionManager { // once a collection of subscriptions is stored it does not change private final ConcurrentMap, Subscription[]> subscriptionsPerListener; - + private final ClassUtils classUtils; + private final SubscriptionUtils subUtils; private final VarArgUtils varArgUtils; private final StampedLock lock = new StampedLock(); @@ -69,14 +69,14 @@ public final class SubscriptionManager { this.subscriptionsPerListener = new ConcurrentHashMapV8, Subscription[]>(32, LOAD_FACTOR, 1); } - final SuperClassUtils superClass = new SuperClassUtils(LOAD_FACTOR, 1); + classUtils = new ClassUtils(LOAD_FACTOR); - this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, LOAD_FACTOR, - numberOfThreads); + this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, + LOAD_FACTOR, numberOfThreads); // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads); + this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads); } public void shutdown() { @@ -88,7 +88,7 @@ public final class SubscriptionManager { clearConcurrentCollections(); - this.utils.shutdown(); + this.classUtils.clear(); } public void subscribe(final Object listener) { @@ -144,7 +144,6 @@ public final class SubscriptionManager { final ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; final AtomicBoolean varArgPossibility = this.varArgPossibility; - final SubscriptionUtils utils = this.utils; // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // of the huge number of reads compared to writes. @@ -211,7 +210,7 @@ public final class SubscriptionManager { private void clearConcurrentCollections() { - this.utils.clear(); + this.subUtils.clear(); this.varArgUtils.clear(); } @@ -326,7 +325,7 @@ public final class SubscriptionManager { ArrayList collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null // now publish superClasses - final ArrayList superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null + final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null if (collection != null) { collection = new ArrayList(collection); @@ -355,7 +354,7 @@ public final class SubscriptionManager { ArrayList collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null // now publish superClasses - final ArrayList superSubs = this.utils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null if (collection != null) { collection = new ArrayList(collection); @@ -386,7 +385,7 @@ public final class SubscriptionManager { .get(messageClass1, messageClass2, messageClass3); // can return null // now publish superClasses - final ArrayList superSubs = this.utils + final ArrayList superSubs = this.subUtils .getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // NOT return null if (collection != null) { @@ -594,80 +593,79 @@ public final class SubscriptionManager { final StampedLock lock = this.lock; long stamp = lock.readLock(); - final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null - lock.unlockRead(stamp); - + boolean hasSubs = false; // Run subscriptions if (subscriptions != null) { + hasSubs = true; + Subscription sub; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; sub.publish(message); } - - // publish to var arg, only if not already an array - if (varArgPossibility.get() && !isArray) { - stamp = lock.readLock(); - final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); // can return null - lock.unlockRead(stamp); - - if (varArgSubscriptions != null) { - final int length = varArgSubscriptions.length; - Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); - asArray[0] = message; + } - for (int i = 0; i < length; i++) { - sub = varArgSubscriptions[i]; - sub.publish(asArray); - } + // publish to var arg, only if not already an array (because that would be unnecessary) + if (varArgPossibility.get() && !isArray) { + stamp = lock.readLock(); + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL + lock.unlockRead(stamp); - stamp = lock.readLock(); - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass); - lock.unlockRead(stamp); + Subscription sub; + int length = varArgSubs.length; + Object[] asArray = null; - if (varArgSuperSubscriptions != null) { - for (int i = 0; i < length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } - } - else { - stamp = lock.readLock(); + if (length > 1) { + hasSubs = true; - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass); - lock.unlockRead(stamp); + asArray = (Object[]) Array.newInstance(messageClass, 1); + asArray[0] = message; - if (varArgSuperSubscriptions != null) { - Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); - asArray[0] = message; - - for (int i = 0; i < varArgSuperSubscriptions.length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } + for (int i = 0; i < length; i++) { + sub = varArgSubs[i]; + sub.publish(asArray); + } + } + + + // now publish array based superClasses (but only if those ALSO accept vararg) + stamp = lock.readLock(); + final Subscription[] varArgSuperSubs = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);// CAN NOT RETURN NULL + lock.unlockRead(stamp); + + length = varArgSuperSubs.length; + + if (length > 1) { + hasSubs = true; + + if (asArray == null) { + asArray = (Object[]) Array.newInstance(messageClass, 1); + asArray[0] = message; + } + + for (int i = 0; i < length; i++) { + sub = varArgSuperSubs[i]; + sub.publish(asArray); } } - return; } // only get here if there were no other subscriptions // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message); + if (!hasSubs) { + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message); - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } } } } @@ -676,67 +674,77 @@ public final class SubscriptionManager { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2); // can return null + lock.unlockRead(stamp); + boolean hasSubs = false; // Run subscriptions if (subscriptions != null) { + hasSubs = true; + Subscription sub; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; sub.publish(message1, message2); } + } - // publish to var arg, only if not already an array AND we are all of the same type - if (varArgPossibility.get() && messageClass1 == messageClass2 && !messageClass1.isArray()) { - long stamp = lock.readLock(); - final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null + // publish to var arg, only if not already an array AND we are all of the same type + if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray()) { + + // vararg can ONLY work if all types are the same + if (messageClass1 == messageClass2) { + stamp = lock.readLock(); + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null lock.unlockRead(stamp); - if (varArgSubscriptions != null) { - final int length = varArgSubscriptions.length; + final int length = varArgSubs.length; + if (length > 0) { + hasSubs = true; + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray[0] = message1; asArray[1] = message2; - + Subscription sub; for (int i = 0; i < length; i++) { - sub = varArgSubscriptions[i]; + sub = varArgSubs[i]; sub.publish(asArray); } - - stamp = lock.readLock(); - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); - lock.unlockRead(stamp); - - if (varArgSuperSubscriptions != null) { - for (int i = 0; i < length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } - } - else { - stamp = lock.readLock(); - - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); - lock.unlockRead(stamp); - - if (varArgSuperSubscriptions != null) { - Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); - asArray[0] = message1; - asArray[1] = message2; - - for (int i = 0; i < varArgSuperSubscriptions.length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } + } + } + + // now publish array based superClasses (but only if those ALSO accept vararg) + stamp = lock.readLock(); + final Subscription[] varArgSuperSubs = this.varArgUtils + .getVarArgSuperSubscriptions(messageClass1, messageClass2); // CAN NOT RETURN NULL + lock.unlockRead(stamp); + + + final int length = varArgSuperSubs.length; + if (length > 0) { + hasSubs = true; + + Class arrayType; + Object[] asArray; + + Subscription sub; + for (int i = 0; i < length; i++) { + sub = varArgSuperSubs[i]; + arrayType = sub.getHandler().getVarArgClass(); + + asArray = (Object[]) Array.newInstance(arrayType, 2); + asArray[0] = message1; + asArray[1] = message2; + + sub.publish(asArray); } } } - else { + + if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { @@ -756,69 +764,81 @@ public final class SubscriptionManager { final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subs = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3); // can return null + lock.unlockRead(stamp); + + boolean hasSubs = false; // Run subscriptions - if (subscriptions != null) { + if (subs != null) { + hasSubs = true; + Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; + for (int i = 0; i < subs.length; i++) { + sub = subs[i]; sub.publish(message1, message2, message3); } + } - // publish to var arg, only if not already an array AND we are all of the same type - if (varArgPossibility.get() && messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) { - long stamp = lock.readLock(); - final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null + // publish to var arg, only if not already an array AND we are all of the same type + if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray() && !messageClass3.isArray()) { + + // vararg can ONLY work if all types are the same + if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { + stamp = lock.readLock(); + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null lock.unlockRead(stamp); - if (varArgSubscriptions != null) { - final int length = varArgSubscriptions.length; + final int length = varArgSubs.length; + if (length > 0) { + hasSubs = true; + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray[0] = message1; asArray[1] = message2; asArray[2] = message3; - + Subscription sub; for (int i = 0; i < length; i++) { - sub = varArgSubscriptions[i]; + sub = varArgSubs[i]; sub.publish(asArray); } - - stamp = lock.readLock(); - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); - lock.unlockRead(stamp); - - if (varArgSuperSubscriptions != null) { - for (int i = 0; i < length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } - } - else { - stamp = lock.readLock(); - - // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); - lock.unlockRead(stamp); - - if (varArgSuperSubscriptions != null) { - Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); - asArray[0] = message1; - asArray[1] = message2; - asArray[2] = message3; - - for (int i = 0; i < varArgSuperSubscriptions.length; i++) { - sub = varArgSuperSubscriptions[i]; - sub.publish(asArray); - } - } + } + } + + + // now publish array based superClasses (but only if those ALSO accept vararg) + stamp = lock.readLock(); + final Subscription[] varArgSuperSubs = this.varArgUtils + .getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // CAN NOT RETURN NULL + lock.unlockRead(stamp); + + + final int length = varArgSuperSubs.length; + if (length > 0) { + hasSubs = true; + + Class arrayType; + Object[] asArray; + + Subscription sub; + for (int i = 0; i < length; i++) { + sub = varArgSuperSubs[i]; + arrayType = sub.getHandler().getVarArgClass(); + + asArray = (Object[]) Array.newInstance(arrayType, 3); + asArray[0] = message1; + asArray[1] = message2; + asArray[2] = message3; + + sub.publish(asArray); } } } - else { + + if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java index afeccc0..4b4cc1d 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java @@ -1,8 +1,8 @@ package dorkbox.util.messagebus.subscription; +import dorkbox.util.messagebus.common.ClassUtils; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.SuperClassUtils; import dorkbox.util.messagebus.common.thread.ClassHolder; import dorkbox.util.messagebus.common.thread.SubscriptionHolder; @@ -10,7 +10,7 @@ import java.util.ArrayList; import java.util.Map; public final class SubscriptionUtils { - private final SuperClassUtils superClass; + private final ClassUtils superClass; private final ClassHolder classHolderSingle; @@ -28,9 +28,9 @@ public final class SubscriptionUtils { private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - public SubscriptionUtils(SuperClassUtils superClass, Map, ArrayList> subscriptionsPerMessageSingle, - HashMapTree, ArrayList> subscriptionsPerMessageMulti, float loadFactor, - int stripeSize) { + public SubscriptionUtils(final ClassUtils superClass, Map, ArrayList> subscriptionsPerMessageSingle, + final HashMapTree, ArrayList> subscriptionsPerMessageMulti, final float loadFactor, + final int stripeSize) { this.superClass = superClass; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; @@ -50,11 +50,7 @@ public final class SubscriptionUtils { public void clear() { this.superClassSubscriptions.clear(); - } - - - public void shutdown() { - this.superClass.shutdown(); + this.superClassSubscriptionsMulti.clear(); } @@ -71,9 +67,9 @@ public final class SubscriptionUtils { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.superClassSubscriptions; - ArrayList superSubscriptions = local.get(clazz); + ArrayList subs = local.get(clazz); - if (superSubscriptions == null) { + if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them final Map, ArrayList> local2 = this.subscriptionsPerMessageSingle; @@ -86,7 +82,7 @@ public final class SubscriptionUtils { final int length = superClasses.length; int superSubLength; - superSubscriptions = new ArrayList(length); + subs = new ArrayList(length); for (int i = 0; i < length; i++) { superClass = superClasses[i]; @@ -97,18 +93,18 @@ public final class SubscriptionUtils { for (int j = 0; j < superSubLength; j++) { sub = superSubs.get(j); - if (sub.acceptsSubtypes()) { - superSubscriptions.add(sub); + if (sub.getHandler().acceptsSubtypes()) { + subs.add(sub); } } } } - superSubscriptions.trimToSize(); - local.put(clazz, superSubscriptions); + subs.trimToSize(); + local.put(clazz, subs); } - return superSubscriptions; + return subs; } /** @@ -122,12 +118,11 @@ public final class SubscriptionUtils { */ public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2) { // whenever our subscriptions change, this map is cleared. - HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; + final HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; - HashMapTree, ArrayList> subsLeaf = local.getLeaf(clazz1, clazz2); - ArrayList subs; + ArrayList subs = local.get(clazz1, clazz2); - if (subsLeaf == null) { + if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them final HashMapTree, ArrayList> local2 = this.subscriptionsPerMessageMulti; @@ -166,7 +161,7 @@ public final class SubscriptionUtils { for (int k = 0; k < superSubs.size(); k++) { sub = superSubs.get(k); - if (sub.acceptsSubtypes()) { + if (sub.getHandler().acceptsSubtypes()) { subs.add(sub); } } @@ -176,10 +171,6 @@ public final class SubscriptionUtils { subs.trimToSize(); local.put(subs, clazz1, clazz2); } - else { - // if the leaf exists, then the value exists. - subs = subsLeaf.getValue(); - } return subs; } @@ -193,14 +184,13 @@ public final class SubscriptionUtils { * * @return CAN NOT RETURN NULL */ - public ArrayList getSuperSubscriptions(Class clazz1, Class clazz2, Class clazz3) { + public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Class clazz3) { // whenever our subscriptions change, this map is cleared. - HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; + final HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; - HashMapTree, ArrayList> subsLeaf = local.getLeaf(clazz1, clazz2, clazz3); - ArrayList subs; + ArrayList subs = local.get(clazz1, clazz2, clazz3); - if (subsLeaf == null) { + if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them final HashMapTree, ArrayList> local2 = this.subscriptionsPerMessageMulti; @@ -250,7 +240,7 @@ public final class SubscriptionUtils { for (int m = 0; m < superSubs.size(); m++) { sub = superSubs.get(m); - if (sub.acceptsSubtypes()) { + if (sub.getHandler().acceptsSubtypes()) { subs.add(sub); } } @@ -261,10 +251,6 @@ public final class SubscriptionUtils { subs.trimToSize(); local.put(subs, clazz1, clazz2, clazz3); } - else { - // if the leaf exists, then the value exists. - subs = subsLeaf.getValue(); - } return subs; } diff --git a/src/test/java/dorkbox/util/messagebus/AsyncFIFOBusTest.java b/src/test/java/dorkbox/util/messagebus/AsyncFIFOBusTest.java index 3a20cfb..de36f46 100644 --- a/src/test/java/dorkbox/util/messagebus/AsyncFIFOBusTest.java +++ b/src/test/java/dorkbox/util/messagebus/AsyncFIFOBusTest.java @@ -1,15 +1,12 @@ package dorkbox.util.messagebus; +import dorkbox.util.messagebus.annotations.Handler; +import dorkbox.util.messagebus.common.MessageBusTest; +import org.junit.Test; + import java.util.LinkedList; import java.util.List; -import org.junit.Test; - -import dorkbox.util.messagebus.IMessageBus; -import dorkbox.util.messagebus.MultiMBassador; -import dorkbox.util.messagebus.annotations.Handler; -import dorkbox.util.messagebus.common.MessageBusTest; - /** * * @author bennidi @@ -21,6 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { public void testSingleThreadedSyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners IMessageBus fifoBUs = new MultiMBassador(); + fifoBUs.start(); List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ diff --git a/src/test/java/dorkbox/util/messagebus/MBassadorTest.java b/src/test/java/dorkbox/util/messagebus/MBassadorTest.java index 8e93042..53291a9 100644 --- a/src/test/java/dorkbox/util/messagebus/MBassadorTest.java +++ b/src/test/java/dorkbox/util/messagebus/MBassadorTest.java @@ -1,15 +1,6 @@ package dorkbox.util.messagebus; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import dorkbox.util.messagebus.MultiMBassador; -import dorkbox.util.messagebus.common.ConcurrentExecutor; -import dorkbox.util.messagebus.common.ListenerFactory; -import dorkbox.util.messagebus.common.MessageBusTest; -import dorkbox.util.messagebus.common.MessageManager; -import dorkbox.util.messagebus.common.TestUtil; +import dorkbox.util.messagebus.common.*; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.listeners.ExceptionThrowingListener; @@ -19,6 +10,9 @@ import dorkbox.util.messagebus.listeners.MessagesListener; import dorkbox.util.messagebus.messages.MessageTypes; import dorkbox.util.messagebus.messages.MultipartMessage; import dorkbox.util.messagebus.messages.StandardMessage; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; /** * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. @@ -108,6 +102,7 @@ public class MBassadorTest extends MessageBusTest { final MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(ExceptionCounter); + bus.start(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); @@ -134,8 +129,8 @@ public class MBassadorTest extends MessageBusTest { while (bus.hasPendingMessages()) { pause(10); } - assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); + assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); bus.shutdown(); } diff --git a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java index 9833bb1..31080b3 100644 --- a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java +++ b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java @@ -20,6 +20,7 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSending(){ IMessageBus bus = new MultiMBassador(); + bus.start(); Listener listener1 = new Listener(); bus.subscribe(listener1); @@ -39,11 +40,11 @@ public class MultiMessageTest extends MessageBusTest { bus.publish("s"); // 4 bus.publish("s", "s"); // 3 bus.publish("s", "s", "s"); // 3 - bus.publish(1, "s"); - bus.publish(1, 2, "s"); - bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); + bus.publish(1, "s"); // 1 + bus.publish(1, 2, "s"); // 2 + bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2 - assertEquals(13, count.get()); + assertEquals(15, count.get()); count.set(0); @@ -58,9 +59,6 @@ public class MultiMessageTest extends MessageBusTest { try { Thread.sleep(ConcurrentUnits); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - // log.error(e); } } diff --git a/src/test/java/dorkbox/util/messagebus/ObjectTreeTest.java b/src/test/java/dorkbox/util/messagebus/ObjectTreeTest.java index 5d8d124..5642d3a 100644 --- a/src/test/java/dorkbox/util/messagebus/ObjectTreeTest.java +++ b/src/test/java/dorkbox/util/messagebus/ObjectTreeTest.java @@ -15,10 +15,9 @@ */ package dorkbox.util.messagebus; -import org.junit.Test; - import dorkbox.util.messagebus.common.AssertSupport; import dorkbox.util.messagebus.common.HashMapTree; +import org.junit.Test; public class ObjectTreeTest extends AssertSupport { @@ -34,7 +33,7 @@ public class ObjectTreeTest extends AssertSupport { public void test(HashMapTree, String> tree, String string, Class clazz1, Class clazz2, Class clazz3) { tree.put(string, clazz1, clazz2, clazz3); - assertEquals(string, tree.getValue(clazz1, clazz2, clazz3)); + assertEquals(string, tree.get(clazz1, clazz2, clazz3)); } public void test(HashMapTree, String> tree, String string, Class... clazzes) { diff --git a/src/test/java/dorkbox/util/messagebus/SyncBusTest.java b/src/test/java/dorkbox/util/messagebus/SyncBusTest.java index 9c1bc8b..36dcf5b 100644 --- a/src/test/java/dorkbox/util/messagebus/SyncBusTest.java +++ b/src/test/java/dorkbox/util/messagebus/SyncBusTest.java @@ -1,12 +1,5 @@ package dorkbox.util.messagebus; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; -import org.junit.Test; - -import dorkbox.util.messagebus.IMessageBus; -import dorkbox.util.messagebus.MultiMBassador; import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ListenerFactory; import dorkbox.util.messagebus.common.MessageBusTest; @@ -19,6 +12,10 @@ import dorkbox.util.messagebus.listeners.MessagesListener; import dorkbox.util.messagebus.messages.MessageTypes; import dorkbox.util.messagebus.messages.MultipartMessage; import dorkbox.util.messagebus.messages.StandardMessage; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; /** * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. @@ -32,6 +29,7 @@ public class SyncBusTest extends MessageBusTest { public void testSynchronousMessagePublication() throws Exception { final IMessageBus bus = new MultiMBassador(); + bus.start(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class) @@ -86,6 +84,7 @@ public class SyncBusTest extends MessageBusTest { final IMessageBus bus = new MultiMBassador(); bus.addErrorHandler(ExceptionCounter); + bus.start(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); diff --git a/src/test/java/dorkbox/util/messagebus/common/MessageBusTest.java b/src/test/java/dorkbox/util/messagebus/common/MessageBusTest.java index c1df9e8..108cb8f 100644 --- a/src/test/java/dorkbox/util/messagebus/common/MessageBusTest.java +++ b/src/test/java/dorkbox/util/messagebus/common/MessageBusTest.java @@ -1,13 +1,11 @@ package dorkbox.util.messagebus.common; -import junit.framework.Assert; - -import org.junit.Before; - import dorkbox.util.messagebus.MultiMBassador; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.messages.MessageTypes; +import junit.framework.Assert; +import org.junit.Before; /** * A base test that provides a factory for message bus that makes tests fail if any @@ -44,6 +42,7 @@ public abstract class MessageBusTest extends AssertSupport { public MultiMBassador createBus() { MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(TestFailingHandler); + bus.start(); return bus; } @@ -51,6 +50,7 @@ public abstract class MessageBusTest extends AssertSupport { MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(TestFailingHandler); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); + bus.start(); return bus; } }