diff --git a/src/main/java/dorkbox/util/messagebus/IMessageBus.java b/src/main/java/dorkbox/util/messagebus/IMessageBus.java index fd01d21..4382ca1 100644 --- a/src/main/java/dorkbox/util/messagebus/IMessageBus.java +++ b/src/main/java/dorkbox/util/messagebus/IMessageBus.java @@ -14,7 +14,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; *

* Each message publication is isolated from all other running publications such that it does not interfere with them. * Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple - * messages getSubscriptions published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked + * messages publish published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked * in a synchronized fashion using @Synchronized annotation * *

@@ -25,7 +25,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; *

* By default, the bus uses weak references to all listeners such that registered listeners do not need to * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are - * removed on-the-fly as messages getSubscriptions dispatched. This can be changed using the @Listener annotation. + * removed on-the-fly as messages publish dispatched. This can be changed using the @Listener annotation. * *

* Generally message handlers will be invoked in inverse sequence of subscription but any @@ -50,7 +50,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; * *

* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will - * getSubscriptions dispatched to all message handlers that take an instance of List as their parameter + * publish dispatched to all message handlers that take an instance of List as their parameter * * @Author bennidi * Date: 2/8/12 diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 2d26b59..5719848 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -73,21 +73,20 @@ public class MultiMBassador implements IMessageBus { if (forceExactMatches) { subscriptionMatcher = new Matcher() { @Override - public Subscription[] getSubscriptions(Class messageClass) { - return subscriptionManager.getSubscriptionsForcedExact(messageClass); + public boolean publish(Object message) throws Throwable { + return subscriptionManager.publishExact(message); } }; } else { subscriptionMatcher = new Matcher() { @Override - public Subscription[] getSubscriptions(Class messageClass) { - return subscriptionManager.getSubscriptions(messageClass); + public boolean publish(Object message) throws Throwable { + return subscriptionManager.publish(message); } }; } - this.threads = new ArrayDeque(numberOfThreads); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); @@ -101,6 +100,7 @@ public class MultiMBassador implements IMessageBus { MultiNode node = new MultiNode(); while (!MultiMBassador.this.shuttingDown) { try { + //noinspection InfiniteLoopStatement while (true) { IN_QUEUE.take(node); switch (node.messageType) { @@ -112,7 +112,6 @@ public class MultiMBassador implements IMessageBus { continue; case 3: publish(node.item1, node.item2, node.item3); - continue; } } } catch (InterruptedException e) { @@ -120,22 +119,21 @@ public class MultiMBassador implements IMessageBus { switch (node.messageType) { case 1: { handlePublicationError( - new PublicationError().setMessage("Thread interupted while processing message") + new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e).setPublishedObject(node.item1)); continue; } case 2: { handlePublicationError( - new PublicationError().setMessage("Thread interupted while processing message") + new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e).setPublishedObject(node.item1, node.item2)); continue; } case 3: { handlePublicationError( - new PublicationError().setMessage("Thread interupted while processing message") + new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e) .setPublishedObject(node.item1, node.item2, node.item3)); - continue; } } } @@ -202,97 +200,19 @@ public class MultiMBassador implements IMessageBus { return this.dispatchQueue.hasPendingMessages(); } - - @Override public void publish(final Object message) { try { - boolean subsPublished = false; - final SubscriptionManager manager = this.subscriptionManager; - final Class messageClass = message.getClass(); + boolean subsPublished = subscriptionMatcher.publish(message); - Subscription[] subscriptions; - Subscription sub; - - subscriptions = subscriptionMatcher.getSubscriptions(messageClass); - int c = 0; - // Run subscriptions - if (subscriptions != null) { - subsPublished = true; - - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message); - c = sub.c(); - } - } - - -// if (!this.forceExactMatches) { -// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null -// // now getSubscriptions superClasses -// int length = superSubscriptions.length; -// -// if (length > 0) { -// for (int i=0;i varargSubscriptions = manager.getVarArgSubscriptions(messageClass); -// if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { -// asArray = (Object[]) Array.newInstance(messageClass, 1); -// asArray[0] = message; - // -// for (iterator = varargSubscriptions.iterator(); iterator.hasNext();) { -// sub = iterator.next(); -// // this catches all exception types -// sub.publishToSubscription(this, subsPublished, asArray); -// } -// } - // -// ConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); -// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) -// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { -// if (asArray == null) { -// asArray = (Object[]) Array.newInstance(messageClass, 1); -// asArray[0] = message; -// } -// for (iterator = varargSuperSubscriptions.iterator(); iterator.hasNext();) { -// sub = iterator.next(); -// // this catches all exception types -// sub.publishToSubscription(this, subsPublished, asArray); -// } -// } -// } -// } - - if (c == 0 && !subsPublished) { + if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) - Subscription[] deadSubscriptions = manager.getSubscriptionsForcedExact(DeadMessage.class); + Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsForcedExact(DeadMessage.class); if (deadSubscriptions != null) { DeadMessage deadMessage = new DeadMessage(message); + Subscription sub; + //noinspection ForLoopReplaceableByForEach for (int i = 0; i < deadSubscriptions.length; i++) { sub = deadSubscriptions[i]; @@ -314,7 +234,7 @@ public class MultiMBassador implements IMessageBus { // Class messageClass2 = message2.getClass(); // // StrongConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); -// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions(); +// BooleanHolder subsPublished = this.booleanThreadLocal.publish(); // subsPublished.bool = false; // // ISetEntry current; @@ -334,7 +254,7 @@ public class MultiMBassador implements IMessageBus { // // if (!this.forceExactMatches) { // StrongConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); -// // now getSubscriptions superClasses +// // now publish superClasses // if (superSubscriptions != null) { // current = superSubscriptions.head; // while (current != null) { @@ -368,7 +288,7 @@ public class MultiMBassador implements IMessageBus { // } // // StrongConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); -// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) +// // now publish array based superClasses (but only if those ALSO accept vararg) // if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { // if (asArray == null) { // asArray = (Object[]) Array.newInstance(messageClass1, 2); @@ -388,7 +308,7 @@ public class MultiMBassador implements IMessageBus { // } else { // StrongConcurrentSet varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); // -// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) +// // now publish array based superClasses (but only if those ALSO accept vararg) // if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { // current = varargSuperMultiSubscriptions.head; // while (current != null) { @@ -438,7 +358,7 @@ public class MultiMBassador implements IMessageBus { // Class messageClass3 = message3.getClass(); // // StrongConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); -// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions(); +// BooleanHolder subsPublished = this.booleanThreadLocal.publish(); // subsPublished.bool = false; // // ISetEntry current; @@ -459,7 +379,7 @@ public class MultiMBassador implements IMessageBus { // // if (!this.forceExactMatches) { // StrongConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); -// // now getSubscriptions superClasses +// // now publish superClasses // if (superSubscriptions != null) { // current = superSubscriptions.head; // while (current != null) { @@ -493,7 +413,7 @@ public class MultiMBassador implements IMessageBus { // } // // StrongConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); -// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) +// // now publish array based superClasses (but only if those ALSO accept vararg) // if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { // if (asArray == null) { // asArray = (Object[]) Array.newInstance(messageClass1, 3); @@ -514,7 +434,7 @@ public class MultiMBassador implements IMessageBus { // } else { // StrongConcurrentSet varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // -// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) +// // now publish array based superClasses (but only if those ALSO accept vararg) // if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { // current = varargSuperMultiSubscriptions.head; // while (current != null) { diff --git a/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java b/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java index f6f37e1..16ea565 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java +++ b/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java @@ -27,14 +27,14 @@ import java.util.concurrent.ForkJoinPool; * interoperable with {@code Hashtable} in programs that rely on its * thread safety but not on its synchronization details. * - *

Retrieval operations (including {@code getSubscriptions}) generally do not + *

Retrieval operations (including {@code get}) generally do not * block, so may overlap with update operations (including {@code put} * and {@code remove}). Retrievals reflect the results of the most * recently completed update operations holding upon their * onset. (More formally, an update operation for a given key bears a * happens-before relation with any (non-null) retrieval for * that key reporting the updated value.) For aggregate operations - * such as {@code putAll} and {@code shutdown}, concurrent retrievals may + * such as {@code putAll} and {@code clear}, concurrent retrievals may * reflect insertion or removal of only some entries. Similarly, * Iterators and Enumerations return elements reflecting the state of * the hash table at some point at or since the creation of the @@ -141,7 +141,7 @@ import java.util.concurrent.ForkJoinPool; * *

The concurrency properties of bulk operations follow * from those of ConcurrentHashMapV8: Any non-null result returned - * from {@code getSubscriptions(key)} and related access methods bears a + * from {@code get(key)} and related access methods bears a * happens-before relation with the associated insertion or * update. The result of any bulk operation reflects the * composition of these per-element relations (but is not @@ -275,7 +275,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // * Overview: // * // * The primary design goal of this hash table is to maintain -// * concurrent readability (typically method getSubscriptions(), but also +// * concurrent readability (typically method publish(), but also // * iterators and related methods) while minimizing update // * contention. Secondary goals are to keep space consumption about // * the same or better than java.util.HashMap, and to support high @@ -632,7 +632,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // } // // /** -// * Virtualized support for map.getSubscriptions(); overridden in subclasses. +// * Virtualized support for map.publish(); overridden in subclasses. // */ // Node find(int h, Object k) { // Node e = this; @@ -931,7 +931,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // * @throws NullPointerException if the specified key is null // */ // @Override -// public V getSubscriptions(Object key) { +// public V publish(Object key) { // Node[] tab; Node e, p; int n, eh; K ek; // int h = spread(key.hashCode()); // if ((tab = this.table) != null && (n = tab.length) > 0 && @@ -965,7 +965,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // */ // @Override // public boolean containsKey(Object key) { -// return getSubscriptions(key) != null; +// return publish(key) != null; // } // // /** @@ -1000,7 +1000,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // * Maps the specified key to the specified value in this table. // * Neither the key nor the value can be null. // * -// *

The value can be retrieved by calling the {@code getSubscriptions} method +// *

The value can be retrieved by calling the {@code publish} method // * with a key that is equal to the original key. // * // * @param key key with which the specified value is to be associated @@ -1381,7 +1381,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // Traverser it = new Traverser(t, f, 0, f); // for (Node p; (p = it.advance()) != null; ) { // V val = p.val; -// Object v = m.getSubscriptions(p.key); +// Object v = m.publish(p.key); // if (v == null || v != val && !v.equals(val)) { // return false; // } @@ -1390,7 +1390,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // Object mk, mv, v; // if ((mk = e.getKey()) == null || // (mv = e.getValue()) == null || -// (v = getSubscriptions(mk)) == null || +// (v = publish(mk)) == null || // mv != v && !mv.equals(v)) { // return false; // } @@ -1628,7 +1628,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // */ // public V getOrDefault(Object key, V defaultValue) { // V v; -// return (v = getSubscriptions(key)) == null ? defaultValue : v; +// return (v = publish(key)) == null ? defaultValue : v; // } // // public void forEach(BiAction action) { @@ -1659,7 +1659,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // throw new NullPointerException(); // } // if (replaceNode(key, newValue, oldValue) != null || -// (oldValue = getSubscriptions(key)) == null) { +// (oldValue = publish(key)) == null) { // break; // } // } @@ -2328,7 +2328,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // !U.compareAndSwapLong(this, BASECOUNT, b = this.baseCount, s = b + x)) { // CounterHashCode hc; CounterCell a; long v; int m; // boolean uncontended = true; -// if ((hc = threadCounterHashCode.getSubscriptions()) == null || +// if ((hc = threadCounterHashCode.publish()) == null || // as == null || (m = as.length - 1) < 0 || // (a = as[m & hc.code]) == null || // !(uncontended = @@ -4037,7 +4037,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // Object k, v, r; Map.Entry e; // return o instanceof Map.Entry && // (k = (e = (Map.Entry)o).getKey()) != null && -// (r = this.map.getSubscriptions(k)) != null && +// (r = this.map.publish(k)) != null && // (v = e.getValue()) != null && // (v == r || v.equals(r)); // } @@ -4328,7 +4328,7 @@ public class ConcurrentHashMapV8 extends ConcurrentHashMap // Class k = sun.misc.Unsafe.class; // for (java.lang.reflect.Field f : k.getDeclaredFields()) { // f.setAccessible(true); -// Object x = f.getSubscriptions(null); +// Object x = f.publish(null); // if (k.isInstance(x)) { // return k.cast(x); // } diff --git a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java index c90e308..84bdc73 100644 --- a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java +++ b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java @@ -9,7 +9,7 @@ import java.util.concurrent.locks.Lock; /** - * Simple tree structure that is a map that contains a chain of keys to getSubscriptions to a value. + * Simple tree structure that is a map that contains a chain of keys to publish to a value. *

* THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes * @@ -506,7 +506,7 @@ public class HashMapTree { READ.lock(); // allows other readers, blocks others from acquiring update or write locks HashMapTree objectTree = null; - // getSubscriptions value from our children + // publish value from our children objectTree = getLeaf_NL(key); // protected by lock if (objectTree == null) { @@ -525,7 +525,7 @@ public class HashMapTree { READ.lock(); // allows other readers, blocks others from acquiring update or write locks HashMapTree tree = null; - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(key1); // protected by lock if (tree != null) { tree = tree.getLeaf_NL(key2); // protected by lock @@ -547,7 +547,7 @@ public class HashMapTree { READ.lock(); // allows other readers, blocks others from acquiring update or write locks HashMapTree tree = null; - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(key1); if (tree != null) { tree = tree.getLeaf_NL(key2); @@ -573,7 +573,7 @@ public class HashMapTree { READ.lock(); // allows other readers, blocks others from acquiring update or write locks HashMapTree tree = null; - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(keys[0]); int size = keys.length; @@ -625,7 +625,7 @@ public class HashMapTree { Lock READ = this.lock.readLock(); READ.lock(); // allows other readers, blocks others from acquiring update or write locks - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(key1); if (tree != null) { tree = tree.getLeaf_NL(key2); @@ -642,7 +642,7 @@ public class HashMapTree { Lock READ = this.lock.readLock(); READ.lock(); // allows other readers, blocks others from acquiring update or write locks - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(key1); if (tree != null) { tree = tree.getLeaf_NL(key2); @@ -668,7 +668,7 @@ public class HashMapTree { READ.lock(); // allows other readers, blocks others from acquiring update or write locks HashMapTree tree = null; - // getSubscriptions value from our children + // publish value from our children tree = getLeaf_NL(keys[0]); for (int i=1;i target) { - // getSubscriptions all handlers (this will include all (inherited) methods directly annotated using @Handler) + // publish all handlers (this will include all (inherited) methods directly annotated using @Handler) final Method[] allMethods = ReflectionUtils.getMethods(target); final int length = allMethods.length; diff --git a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java b/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java index a9a0324..b9f58da 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java @@ -29,7 +29,7 @@ public class SuperClassUtils { Class[] classes = local.get(clazz); if (classes == null) { - // getSubscriptions all super types of class + // publish all super types of class final Class[] superTypes = ReflectionUtils.getSuperTypes(clazz); final int length = superTypes.length; diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgPossibility.java b/src/main/java/dorkbox/util/messagebus/common/VarArgPossibility.java deleted file mode 100644 index 951f52b..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgPossibility.java +++ /dev/null @@ -1,49 +0,0 @@ -package dorkbox.util.messagebus.common; - -import org.jctools.util.UnsafeAccess; - -abstract class VarArgPossibility_P0 { - // pre-padding - volatile long y0, y1, y2, y4, y5, y6 = 7L; -} - -abstract class VarArgPossibility_I0 extends VarArgPossibility_P0 { - public boolean hasVarArgPossibility = false; -} - -public class VarArgPossibility extends VarArgPossibility_I0 { - private static final long BOOL; - - static { - try { - BOOL = UnsafeAccess.UNSAFE.objectFieldOffset(VarArgPossibility.class.getField("hasVarArgPossibility")); - - // now make sure we can access UNSAFE - VarArgPossibility bool = new VarArgPossibility(); - boolean o = true; - bool.set( o); - boolean lpItem1 = bool.get(); - bool.set(false); - - if (lpItem1 != o) { - throw new Exception("Cannot access unsafe fields"); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public final void set(boolean bool) { - UnsafeAccess.UNSAFE.putBoolean(this, BOOL, bool); - } - - public final boolean get() { - return UnsafeAccess.UNSAFE.getBoolean(this, BOOL); - } - - // post-padding - volatile long z0, z1, z2, z4, z5, z6 = 7L; - - public VarArgPossibility() { - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index fbbd81e..573c94a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -6,12 +6,12 @@ import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.SubscriptionUtils; import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentMap; public class VarArgUtils { - private final ConcurrentMap, ConcurrentSet> varArgSubscriptions; - private final ConcurrentMap, ConcurrentSet> varArgSuperClassSubscriptions; + private final Map, ArrayList> varArgSubscriptions; + private final Map, List> varArgSuperClassSubscriptions; private final HashMapTree, ConcurrentSet> varArgSuperClassSubscriptionsMulti; private final SubscriptionHolder subHolderConcurrent; @@ -20,19 +20,22 @@ public class VarArgUtils { private final int stripeSize; private final SubscriptionUtils utils; + private final SuperClassUtils superClassUtils; private final Map, ArrayList> subscriptionsPerMessageSingle; - public VarArgUtils(SubscriptionUtils utils, Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, + public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils, + Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) { this.utils = utils; + this.superClassUtils = superClassUtils; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; this.loadFactor = loadFactor; this.stripeSize = stripeSize; - this.varArgSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(16, loadFactor, stripeSize); - this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(16, loadFactor, stripeSize); + this.varArgSubscriptions = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); + this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, List>(16, loadFactor, stripeSize); this.varArgSuperClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, loadFactor); this.subHolderConcurrent = new SubscriptionHolder(); @@ -49,12 +52,38 @@ public class VarArgUtils { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSubscriptions(Class messageClass) { -// ConcurrentMap, ConcurrentSet> local = this.varArgSubscriptions; -// -// // whenever our subscriptions change, this map is cleared. + public ArrayList getVarArgSubscriptions(Class messageClass) { + Map, ArrayList> local = this.varArgSubscriptions; + + ArrayList varArgSubs = local.get(messageClass); + + if (varArgSubs == null) { + // this gets (and caches) our array type. This is never cleared. + final Class arrayVersion = this.superClassUtils.getArrayClass(messageClass); + + ArrayList subs = this.subscriptionsPerMessageSingle.get(arrayVersion); + if (subs != null) { + int length = subs.size(); + varArgSubs = new ArrayList(length); + + Subscription sub; + for (int i = 0; i < length; i++) { + sub = subs.get(i); + + if (sub.acceptsVarArgs()) { + varArgSubs.add(sub); + } + } + + local.put(messageClass, varArgSubs); + } + } + + return varArgSubs; + + // whenever our subscriptions change, this map is cleared. // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// ConcurrentSet subsPerType = subHolderConcurrent.getSubscriptions(); +// ConcurrentSet subsPerType = subHolderConcurrent.publish(); // // // cache our subscriptions for super classes, so that their access can be fast! // ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); @@ -62,13 +91,11 @@ public class VarArgUtils { // // we are the first one in the map // subHolderConcurrent.set(subHolderConcurrent.initialValue()); // -// // this caches our array type. This is never cleared. -// Class arrayVersion = this.utils.getArrayClass(messageClass); // // Iterator iterator; // Subscription sub; // -// Collection subs = this.subscriptionsPerMessageSingle.getSubscriptions(arrayVersion); +// Collection subs = this.subscriptionsPerMessageSingle.publish(arrayVersion); // if (subs != null) { // for (iterator = subs.iterator(); iterator.hasNext();) { // sub = iterator.next(); @@ -83,7 +110,7 @@ public class VarArgUtils { // return putIfAbsent; // } - return null; +// return null; } @@ -96,7 +123,7 @@ public class VarArgUtils { // ConcurrentMap, ConcurrentSet> local = this.varArgSuperClassSubscriptions; // // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// ConcurrentSet subsPerType = subHolderConcurrent.getSubscriptions(); +// ConcurrentSet subsPerType = subHolderConcurrent.publish(); // // // cache our subscriptions for super classes, so that their access can be fast! // ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); @@ -123,7 +150,7 @@ public class VarArgUtils { // for (iterator = types.iterator(); iterator.hasNext();) { // superClass = iterator.next(); // -// Collection subs = local2.getSubscriptions(superClass); +// Collection subs = local2.publish(superClass); // if (subs != null) { // for (subIterator = subs.iterator(); subIterator.hasNext();) { // sub = subIterator.next(); @@ -159,7 +186,7 @@ public class VarArgUtils { // subsPerType = subsPerTypeLeaf.getValue(); // } else { // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// subsPerType = subHolderConcurrent.getSubscriptions(); +// subsPerType = subHolderConcurrent.publish(); // // ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); // if (putIfAbsent != null) { @@ -207,7 +234,7 @@ public class VarArgUtils { // subsPerType = subsPerTypeLeaf.getValue(); // } else { // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// subsPerType = subHolderConcurrent.getSubscriptions(); +// subsPerType = subHolderConcurrent.publish(); // // ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); // if (putIfAbsent != null) { diff --git a/src/main/java/dorkbox/util/messagebus/common/item.java b/src/main/java/dorkbox/util/messagebus/common/item.java deleted file mode 100644 index 55919b6..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/item.java +++ /dev/null @@ -1,5 +0,0 @@ -package dorkbox.util.messagebus.common; - -public abstract class item { - public volatile Entry head; // reference to the first element -} diff --git a/src/main/java/dorkbox/util/messagebus/common/item3.java b/src/main/java/dorkbox/util/messagebus/common/item3.java deleted file mode 100644 index 4fb286b..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/item3.java +++ /dev/null @@ -1,11 +0,0 @@ -package dorkbox.util.messagebus.common; - -import java.util.Iterator; - -abstract class item3 implements Iterator { - public ISetEntry current; - - public item3(ISetEntry current) { - this.current = current; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java index 795da92..1cd5386 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java @@ -432,7 +432,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue { // Successful CAS: full barrier final Thread myThread = Thread.currentThread(); -// final Object node = nodeThreadLocal.getSubscriptions(); +// final Object node = nodeThreadLocal.publish(); spType(node, TYPE_CONSUMER); spThread(node, myThread); diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java index 94eec3a..eabe8e1 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java @@ -39,7 +39,7 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { return false; } - // had to modify the super implementation so we getSubscriptions Node back + // had to modify the super implementation so we publish Node back Node alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER); if (alreadyPresent == null) { // this doesn't already exist diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java index 4420a15..7ac2870 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java @@ -1,5 +1,5 @@ package dorkbox.util.messagebus.subscription; public interface Matcher { - Subscription[] getSubscriptions(Class messageClass); + boolean publish(Object messageClass) throws Throwable; } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index cdd1be3..9a158c4 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Date: 2/2/15 */ public class Subscription { - private static AtomicInteger ID_COUNTER = new AtomicInteger(); + private static final AtomicInteger ID_COUNTER = new AtomicInteger(); public final int ID = ID_COUNTER.getAndIncrement(); @@ -86,11 +86,6 @@ public class Subscription { return this.listeners.size(); } - int c = 0; - public int c() { - return this.c; - } - /** * @return true if there were listeners for this publication, false if there was nothing */ @@ -105,7 +100,6 @@ public class Subscription { for (iterator = this.listeners.iterator(); iterator.hasNext();) { listener = iterator.next(); -// this.c++; invocation.invoke(listener, handler, handleIndex, message); } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 9435af4..b6c5ac6 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -1,12 +1,14 @@ package dorkbox.util.messagebus.subscription; import dorkbox.util.messagebus.common.*; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.StampedLock; /** @@ -37,7 +39,7 @@ public class SubscriptionManager { private final Map, Boolean> nonListeners; // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - private final VarArgPossibility varArgPossibility = new VarArgPossibility(); + private AtomicBoolean varArgPossibility = new AtomicBoolean(false); // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message @@ -52,9 +54,11 @@ public class SubscriptionManager { private final ConcurrentMap, Subscription[]> subscriptionsPerListener; + private final SuperClassUtils superClass; private final VarArgUtils varArgUtils; + private final StampedLock lock = new StampedLock(); // private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -72,12 +76,13 @@ public class SubscriptionManager { this.subscriptionsPerListener = new ConcurrentHashMapV8, Subscription[]>(); } - this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, + this.superClass = new SuperClassUtils(loadFactor, 1); + this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads); // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); + this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); } public void shutdown() { @@ -91,10 +96,6 @@ public class SubscriptionManager { clearConcurrentCollections(); } - public boolean hasVarArgPossibility() { - return this.varArgPossibility.get(); - } - public void subscribe(Object listener) { if (listener == null) { return; @@ -138,8 +139,6 @@ public class SubscriptionManager { return; } - - VarArgPossibility varArgPossibility = this.varArgPossibility; Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; @@ -160,7 +159,7 @@ public class SubscriptionManager { // now add this subscription to each of the handled types subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, - subsPerMessageMulti, varArgPossibility); + subsPerMessageMulti); subsForPublication.add(subscription); // activates this sub for publication } @@ -177,7 +176,7 @@ public class SubscriptionManager { } // subscriptions already exist and must only be updated - // only getSubscriptions here if our single-check was OK, or our double-check was OK + // only publish here if our single-check was OK, or our double-check was OK Subscription subscription; for (int i = 0; i < subscriptions.length; i++) { @@ -190,8 +189,7 @@ public class SubscriptionManager { // also puts it into the correct map if it's not already there private Collection getSubsForPublication(final Class[] messageHandlerTypes, final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, - final VarArgPossibility varArgPossibility) { + final HashMapTree, ArrayList> subsPerMessageMulti) { final int size = messageHandlerTypes.length; @@ -208,11 +206,11 @@ public class SubscriptionManager { boolean isArray = type0.isArray(); if (isArray) { - varArgPossibility.set(true); + varArgPossibility.lazySet(true); } // cache the super classes -// utils.cacheSuperClasses(type0, isArray); + //utils.cacheSuperClasses(type0, isArray); subsPerMessageSingle.put(type0, subs); } @@ -222,7 +220,7 @@ public class SubscriptionManager { case 2: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time // SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.getSubscriptions(); +// subsPerType = subHolderSingle.publish(); // // Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); // if (putIfAbsent != null) { @@ -240,7 +238,7 @@ public class SubscriptionManager { case 3: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time // SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.getSubscriptions(); +// subsPerType = subHolderSingle.publish(); // // Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); // if (putIfAbsent != null) { @@ -259,7 +257,7 @@ public class SubscriptionManager { default: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time // SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.getSubscriptions(); +// subsPerType = subHolderSingle.publish(); // // Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); // if (putIfAbsent != null) { @@ -356,26 +354,39 @@ public class SubscriptionManager { } // never return null - public final Subscription[] getSubscriptions(final Class messageClass) { - ArrayList collection; - - + public final Subscription[] getSubscriptions(final Class messageClass, boolean isArray) { StampedLock lock = this.lock; long stamp = lock.readLock(); // Lock readLock = this.lock.readLock(); // readLock.lock(); + final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); + + lock.unlockRead(stamp); +// readLock.unlock(); + + return subscriptions; + } + + // never return null + private Subscription[] getSubscriptions_NL(final Class messageClass, boolean isArray) { + ArrayList collection; + collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null - // now getSubscriptions superClasses - ArrayList superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null + // now publish superClasses + ArrayList superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null if (collection != null) { collection = new ArrayList(collection); - collection.addAll(superSubscriptions); + if (!superSubscriptions.isEmpty()) { + collection.addAll(superSubscriptions); + } } else { - collection = superSubscriptions; + if (!superSubscriptions.isEmpty()) { + collection = superSubscriptions; + } } final Subscription[] subscriptions; @@ -386,9 +397,6 @@ public class SubscriptionManager { subscriptions = null; } - lock.unlockRead(stamp); -// readLock.unlock(); - return subscriptions; } @@ -403,7 +411,7 @@ public class SubscriptionManager { // readLock.lock(); // // try { -// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); +// collection = this.subscriptionsPerMessageSingle.publish(messageType); // if (collection != null) { // subscriptions = collection.toArray(EMPTY); // } @@ -416,7 +424,7 @@ public class SubscriptionManager { // //// long stamp = this.lock.tryOptimisticRead(); // non blocking //// -//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); +//// collection = this.subscriptionsPerMessageSingle.publish(messageType); //// if (collection != null) { ////// subscriptions = new ArrayDeque<>(collection); //// subscriptions = new ArrayList<>(collection); @@ -429,7 +437,7 @@ public class SubscriptionManager { //// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock //// stamp = this.lock.readLock(); //// try { -//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); +//// collection = this.subscriptionsPerMessageSingle.publish(messageType); //// if (collection != null) { ////// subscriptions = new ArrayDeque<>(collection); //// subscriptions = new ArrayList<>(collection); @@ -480,12 +488,6 @@ public class SubscriptionManager { return this.subscriptionsPerMessageMulti.get(messageTypes); } - // CAN NOT RETURN NULL - // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI - // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSubscriptions(Class messageClass) { - return this.varArgUtils.getVarArgSubscriptions(messageClass); - } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI @@ -527,4 +529,103 @@ public class SubscriptionManager { public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { return this.utils.getSuperSubscriptions(superType1, superType2, superType3); } + + /** + * @return true if subscriptions were published + */ + public boolean publishExact(Object message) throws Throwable { + final Class messageClass = message.getClass(); + final boolean isArray = messageClass.isArray(); + + StampedLock lock = this.lock; + long stamp = lock.readLock(); + + final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); + Subscription sub; + + // Run subscriptions + if (subscriptions != null) { + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message); + } + + lock.unlockRead(stamp); + return true; + } + + lock.unlockRead(stamp); + return false; + } + + /** + * @return true if subscriptions were published + */ + public boolean publish(Object message) throws Throwable { + final Class messageClass = message.getClass(); + final boolean isArray = messageClass.isArray(); + + StampedLock lock = this.lock; + long stamp = lock.readLock(); + + + final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); + Subscription sub; + + // Run subscriptions + if (subscriptions != null) { + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message); + } + + // publish to var arg, only if not already an array + if (varArgPossibility.get() && !isArray) { + final ArrayList varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); + + if (varArgSubscriptions != null && !varArgSubscriptions.isEmpty()) { + Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); + asArray[0] = message; + + Iterator iterator; + for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) { + sub = iterator.next(); + + sub.publish(asArray); + } + + // now publish array based superClasses (but only if those ALSO accept vararg) + final Collection varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); + if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { + for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) { + sub = iterator.next(); + + sub.publish(asArray); + } + } + } + else { + // now publish array based superClasses (but only if those ALSO accept vararg) + final Collection varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); + if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { + Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); + asArray[0] = message; + + Iterator iterator; + for (iterator = varArgSuperSubscriptions.iterator(); iterator.hasNext(); ) { + sub = iterator.next(); + + sub.publish(asArray); + } + } + } + } + + lock.unlockRead(stamp); + return true; + } + + lock.unlockRead(stamp); + return false; + } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java index 5cd9696..921e2c9 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java @@ -30,11 +30,10 @@ public class SubscriptionUtils { private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - public SubscriptionUtils(Map, ArrayList> subscriptionsPerMessageSingle, + public SubscriptionUtils(SuperClassUtils superClass, Map, ArrayList> subscriptionsPerMessageSingle, HashMapTree, ArrayList> subscriptionsPerMessageMulti, float loadFactor, int stripeSize) { - - this.superClass = new SuperClassUtils(loadFactor, 1); + this.superClass = superClass; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti; @@ -75,7 +74,7 @@ public class SubscriptionUtils { // long stamp = lock.tryOptimisticRead(); // // if (stamp > 0) { -// ArrayList> arrayList = local.getSubscriptions(clazz); +// ArrayList> arrayList = local.publish(clazz); // if (arrayList != null) { // classes = arrayList.toArray(SUPER_CLASS_EMPTY); // @@ -84,7 +83,7 @@ public class SubscriptionUtils { // } else { // stamp = lock.readLock(); // -// arrayList = local.getSubscriptions(clazz); +// arrayList = local.publish(clazz); // if (arrayList != null) { // classes = arrayList.toArray(SUPER_CLASS_EMPTY); // lock.unlockRead(stamp); @@ -94,7 +93,7 @@ public class SubscriptionUtils { // } // } // -// // unable to getSubscriptions a valid subscription. Have to acquire a write lock +// // unable to publish a valid subscription. Have to acquire a write lock // long origStamp = stamp; // if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { // lock.unlockRead(origStamp); @@ -102,7 +101,7 @@ public class SubscriptionUtils { // } // // -// // getSubscriptions all super types of class +// // publish all super types of class // Collection> superTypes = ReflectionUtils.getSuperTypes(clazz); // ArrayList> arrayList = new ArrayList>(superTypes.size()); // Iterator> iterator; @@ -144,25 +143,25 @@ public class SubscriptionUtils { * * @return CAN NOT RETURN NULL */ - public final ArrayList getSuperSubscriptions(final Class clazz) { + public final ArrayList getSuperSubscriptions(final Class clazz, boolean isArray) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.superClassSubscriptions; ArrayList superSubscriptions = local.get(clazz); if (superSubscriptions == null) { - // types was not empty, so getSubscriptions subscriptions for each type and collate them + // types was not empty, so publish subscriptions for each type and collate them final Map, ArrayList> local2 = this.subscriptionsPerMessageSingle; // save the subscriptions - final Class[] superClasses = this.superClass.getSuperClasses(clazz, clazz.isArray()); // never returns null, cached response + final Class[] superClasses = this.superClass.getSuperClasses(clazz, isArray); // never returns null, cached response Class superClass; ArrayList superSubs; Subscription sub; final int length = superClasses.length; - int superSubLengh; + int superSubLength; superSubscriptions = new ArrayList(length); for (int i = 0; i < length; i++) { @@ -170,8 +169,8 @@ public class SubscriptionUtils { superSubs = local2.get(superClass); if (superSubs != null) { - superSubLengh = superSubs.size(); - for (int j = 0; j < superSubLengh; j++) { + superSubLength = superSubs.size(); + for (int j = 0; j < superSubLength; j++) { sub = superSubs.get(j); if (sub.acceptsSubtypes()) { @@ -181,6 +180,7 @@ public class SubscriptionUtils { } } + superSubscriptions.trimToSize(); local.put(clazz, superSubscriptions); } @@ -202,7 +202,7 @@ public class SubscriptionUtils { // subsPerType = subsPerTypeLeaf.getValue(); // } else { // SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.getSubscriptions(); +// subsPerType = subHolderSingle.publish(); // // // cache our subscriptions for super classes, so that their access can be fast! // ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); @@ -288,7 +288,7 @@ public class SubscriptionUtils { // subsPerType = subsPerTypeLeaf.getValue(); // } else { // SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.getSubscriptions(); +// subsPerType = subHolderSingle.publish(); // // // cache our subscriptions for super classes, so that their access can be fast! // ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); diff --git a/src/test/java/dorkbox/util/messagebus/SynchronizedHandlerTest.java b/src/test/java/dorkbox/util/messagebus/SynchronizedHandlerTest.java index 8c29744..9ba18a1 100644 --- a/src/test/java/dorkbox/util/messagebus/SynchronizedHandlerTest.java +++ b/src/test/java/dorkbox/util/messagebus/SynchronizedHandlerTest.java @@ -48,7 +48,7 @@ public class SynchronizedHandlerTest extends MessageBusTest { @Synchronized public void handleMessage(Object o){ counter.getAndIncrement(); -// System.err.println(counter.getSubscriptions()); +// System.err.println(counter.publish()); } } } diff --git a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java index 16d2031..bffbc90 100644 --- a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -38,7 +38,7 @@ public class SubscriptionValidator extends AssertSupport { // we split subs + superSubs into TWO calls. Collection collection = new ArrayDeque(8); - Subscription[] subscriptions = manager.getSubscriptions(messageType); + Subscription[] subscriptions = manager.getSubscriptions(messageType, messageType.isArray()); if (subscriptions != null) { collection.addAll(Arrays.asList(subscriptions)); }