From 3acd8f934fad5369f7e5f64e6cc3413c31088b0d Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 5 Jun 2015 15:53:22 +0200 Subject: [PATCH] Really good pub throughput/latency. sub/unsub need work --- .../util/messagebus/MultiMBassador.java | 21 +- .../messagebus/common/MessageHandler.java | 5 +- .../messagebus/common/ReflectionUtils.java | 14 +- .../messagebus/common/SuperClassUtils.java | 3 +- .../util/messagebus/common/VarArgUtils.java | 11 +- .../subscription/SubscriptionManager.java | 239 +++++++++--------- .../subscription/SubscriptionUtils.java | 2 +- .../common/SubscriptionValidator.java | 2 +- 8 files changed, 150 insertions(+), 147 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index c9a6496..be9200a 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -50,7 +50,7 @@ public class MultiMBassador implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MultiMBassador(int numberOfThreads) { - this(Mode.Exact, numberOfThreads); + this(Mode.ExactWithSuperTypes, numberOfThreads); } /** @@ -109,14 +109,17 @@ public class MultiMBassador implements IMessageBus { while (true) { IN_QUEUE.take(node); switch (node.messageType) { - case 1: + case 1: { publish(node.item1); - continue; - case 2: + break; + } + case 2: { publish(node.item1, node.item2); - continue; - case 3: + break; + } + default: { publish(node.item1, node.item2, node.item3); + } } } } catch (InterruptedException e) { @@ -126,15 +129,15 @@ public class MultiMBassador implements IMessageBus { handlePublicationError( new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e).setPublishedObject(node.item1)); - continue; + break; } case 2: { handlePublicationError( new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e).setPublishedObject(node.item1, node.item2)); - continue; + break; } - case 3: { + default: { handlePublicationError( new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e) diff --git a/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java index ed0a0ee..aa082da 100644 --- a/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java +++ b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java @@ -64,8 +64,9 @@ public class MessageHandler { } } - MessageHandler[] array = finalMethods.toArray(new MessageHandler[finalMethods.size()]); - return array; + final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()]; + finalMethods.toArray(messageHandlers); + return messageHandlers; } private final MethodAccess handler; diff --git a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java index 5f8d7eb..c407525 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java @@ -1,14 +1,14 @@ package dorkbox.util.messagebus.common; +import dorkbox.util.messagebus.annotations.Handler; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; + import java.lang.annotation.Annotation; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; -import dorkbox.util.messagebus.annotations.Handler; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; - /** * @author bennidi * Date: 2/16/12 @@ -22,7 +22,9 @@ public class ReflectionUtils { ArrayList methods = new ArrayList(); getMethods(target, methods); - return methods.toArray(new Method[methods.size()]); + final Method[] array = new Method[methods.size()]; + methods.toArray(array); + return array; } private static void getMethods(Class target, ArrayList methods) { @@ -81,7 +83,9 @@ public class ReflectionUtils { collectInterfaces( from, superclasses ); } - return superclasses.toArray(new Class[superclasses.size()]); + final Class[] classes = new Class[superclasses.size()]; + superclasses.toArray(classes); + return classes; } public static void collectInterfaces( Class from, Collection> accumulator ) { diff --git a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java b/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java index b9f58da..49dcb51 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SuperClassUtils.java @@ -57,7 +57,8 @@ public class SuperClassUtils { } } - classes = newList.toArray(new Class[newList.size()]); + classes = new Class[newList.size()]; + newList.toArray(classes); local.put(clazz, classes); } diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index 573c94a..b5eeb45 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -52,7 +52,7 @@ public class VarArgUtils { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions - public ArrayList getVarArgSubscriptions(Class messageClass) { + public Subscription[] getVarArgSubscriptions(Class messageClass) { Map, ArrayList> local = this.varArgSubscriptions; ArrayList varArgSubs = local.get(messageClass); @@ -79,7 +79,10 @@ public class VarArgUtils { } } - return varArgSubs; +// return varArgSubs; + + return null; + // whenever our subscriptions change, this map is cleared. // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; @@ -115,10 +118,10 @@ public class VarArgUtils { - // CAN NOT RETURN NULL + // CAN RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass) { + public Subscription[] getVarArgSuperSubscriptions(Class messageClass) { // // whenever our subscriptions change, this map is cleared. // ConcurrentMap, ConcurrentSet> local = this.varArgSuperClassSubscriptions; // diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index afba0e4..a59f54b 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -5,7 +5,6 @@ import dorkbox.util.messagebus.common.*; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,9 +27,7 @@ import java.util.concurrent.locks.StampedLock; * @author dorkbox, llc * Date: 2/2/15 */ -public class SubscriptionManager { - private static final Subscription[] EMPTY = new Subscription[0]; - +public final class SubscriptionManager { private static final float LOAD_FACTOR = 0.8F; private final SubscriptionUtils utils; @@ -39,7 +36,7 @@ public class SubscriptionManager { private final Map, Boolean> nonListeners; // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - private AtomicBoolean varArgPossibility = new AtomicBoolean(false); + private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message @@ -54,13 +51,9 @@ public class SubscriptionManager { private final ConcurrentMap, Subscription[]> subscriptionsPerListener; - private final SuperClassUtils superClass; private final VarArgUtils varArgUtils; - - private final StampedLock lock = new StampedLock(); - // private final ReadWriteLock lock = new ReentrantReadWriteLock(); public SubscriptionManager(int numberOfThreads) { float loadFactor = SubscriptionManager.LOAD_FACTOR; @@ -73,10 +66,10 @@ public class SubscriptionManager { this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, loadFactor); // only used during SUB/UNSUB - this.subscriptionsPerListener = new ConcurrentHashMapV8, Subscription[]>(); + this.subscriptionsPerListener = new ConcurrentHashMapV8, Subscription[]>(32, LOAD_FACTOR, 1); } - this.superClass = new SuperClassUtils(loadFactor, 1); + final SuperClassUtils superClass = new SuperClassUtils(loadFactor, 1); this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads); @@ -85,23 +78,24 @@ public class SubscriptionManager { this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); } - public void shutdown() { + public final void shutdown() { this.nonListeners.clear(); this.subscriptionsPerMessageSingle.clear(); this.subscriptionsPerMessageMulti.clear(); this.subscriptionsPerListener.clear(); - this.utils.shutdown(); clearConcurrentCollections(); + + this.utils.shutdown(); } - public void subscribe(Object listener) { + public final void subscribe(final Object listener) { if (listener == null) { return; } - Class listenerClass = listener.getClass(); + final Class listenerClass = listener.getClass(); if (this.nonListeners.containsKey(listenerClass)) { // early reject of known classes that do not define message handlers @@ -118,8 +112,8 @@ public class SubscriptionManager { // it is important to note that this section CAN be repeated, however the write lock is gained before // anything 'permanent' is saved. This is so the time spent inside the writelock is minimized. - MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); - int handlersSize = messageHandlers.length; + final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); + final int handlersSize = messageHandlers.length; // remember the class as non listening class if no handlers are found if (handlersSize == 0) { @@ -131,7 +125,7 @@ public class SubscriptionManager { final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; - final ArrayList subsPerListener = new ArrayList(handlersSize); + final Subscription[] subsPerListener = new Subscription[handlersSize]; Collection subsForPublication; // create the subscription @@ -145,15 +139,15 @@ public class SubscriptionManager { subscription = new Subscription(messageHandler); subscription.subscribe(listener); - subsPerListener.add(subscription); // activates this sub for sub/unsub + subsPerListener[i] = subscription; // activates this sub for sub/unsub } // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // of the huge number of reads compared to writes. - StampedLock lock = this.lock; - long stamp = lock.writeLock(); + final StampedLock lock = this.lock; + final long stamp = lock.writeLock(); final ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; subscriptions = subsPerListenerMap.get(listenerClass); @@ -164,7 +158,7 @@ public class SubscriptionManager { final SubscriptionUtils utils = this.utils; for (int i = 0; i < handlersSize; i++) { - subscription = subsPerListener.get(i); + subscription = subsPerListener[i]; // now add this subscription to each of the handled types subsForPublication = subscription @@ -174,12 +168,13 @@ public class SubscriptionManager { subsForPublication.add(subscription); // activates this sub for publication } - subsPerListenerMap.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()])); + subsPerListenerMap.put(listenerClass, subsPerListener); lock.unlockWrite(stamp); return; } else { + // continue to subscription lock.unlockWrite(stamp); } } @@ -194,12 +189,12 @@ public class SubscriptionManager { } } - public final void unsubscribe(Object listener) { + public final void unsubscribe(final Object listener) { if (listener == null) { return; } - Class listenerClass = listener.getClass(); + final Class listenerClass = listener.getClass(); if (this.nonListeners.containsKey(listenerClass)) { // early reject of known classes that do not define message handlers return; @@ -208,7 +203,7 @@ public class SubscriptionManager { // these are concurrent collections clearConcurrentCollections(); - Subscription[] subscriptions = getListenerSubs(listenerClass); + final Subscription[] subscriptions = getListenerSubs(listenerClass); if (subscriptions != null) { Subscription subscription; @@ -225,94 +220,75 @@ public class SubscriptionManager { this.varArgUtils.clear(); } - private Subscription[] getListenerSubs(Class listenerClass) { - Subscription[] subscriptions; + private Subscription[] getListenerSubs(final Class listenerClass) { - StampedLock lock = this.lock; - long stamp = lock.readLock(); -// Lock readLock = this.lock.readLock(); -// readLock.lock(); + final StampedLock lock = this.lock; + final long stamp = lock.readLock(); - subscriptions = this.subscriptionsPerListener.get(listenerClass); + final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass); lock.unlockRead(stamp); -// readLock.unlock(); - return subscriptions; } - // retrieves all of the appropriate subscriptions for the message type + // can return null public final Subscription[] getSubscriptionsExact(final Class messageClass) { - ArrayList collection; - Subscription[] subscriptions; + final StampedLock lock = this.lock; + final long stamp = lock.readLock(); - StampedLock lock = this.lock; - long stamp = lock.readLock(); -// Lock readLock = this.lock.readLock(); -// readLock.lock(); + final ArrayList collection = this.subscriptionsPerMessageSingle.get(messageClass); - collection = this.subscriptionsPerMessageSingle.get(messageClass); -// if (collection != null) { - subscriptions = collection.toArray(new Subscription[collection.size()]); - } - else { -// subscriptions = EMPTY; - subscriptions = null; + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + lock.unlockRead(stamp); + return subscriptions; } lock.unlockRead(stamp); -// readLock.unlock(); - return subscriptions; + return null; } - // never return null - // used by unit tests only - public final Subscription[] getSubscriptions(final Class messageClass, boolean isArray) { - StampedLock lock = this.lock; - long stamp = lock.readLock(); -// Lock readLock = this.lock.readLock(); -// readLock.lock(); + // can return null + // public because it is also used by unit tests + public final Subscription[] getSubscriptionsExactAndSuper(final Class messageClass, final boolean isArray) { + final StampedLock lock = this.lock; + final long stamp = lock.readLock(); - final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); + final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass, isArray); lock.unlockRead(stamp); -// readLock.unlock(); - return subscriptions; } - // never return null - private Subscription[] getSubscriptions_NL(final Class messageClass, boolean isArray) { - ArrayList collection; - - collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null + // can return null + private Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class messageClass, final boolean isArray) { + ArrayList collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null // now publish superClasses - ArrayList superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null + final ArrayList superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null if (collection != null) { collection = new ArrayList(collection); + if (!superSubscriptions.isEmpty()) { collection.addAll(superSubscriptions); } } - else { - if (!superSubscriptions.isEmpty()) { + else if (!superSubscriptions.isEmpty()) { collection = superSubscriptions; - } } - final Subscription[] subscriptions; if (collection != null) { - subscriptions = collection.toArray(new Subscription[collection.size()]); + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; } else { - subscriptions = null; + return null; } - - return subscriptions; } @@ -404,12 +380,6 @@ public class SubscriptionManager { } - // CAN NOT RETURN NULL - // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI - // and then, returns the array'd version subscriptions - public Collection getVarArgSuperSubscriptions(Class messageClass) { - return this.varArgUtils.getVarArgSuperSubscriptions(messageClass); - } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI @@ -445,70 +415,78 @@ public class SubscriptionManager { return this.utils.getSuperSubscriptions(superType1, superType2, superType3); } - public final void publishExact(Object message) throws Throwable { + public final void publishExact(final Object message) throws Throwable { final Class messageClass = message.getClass(); - final Subscription[] subscriptions = getSubscriptionsExact(messageClass); - Subscription sub; + final Subscription[] subscriptions = getSubscriptionsExact(messageClass); // can return null // Run subscriptions if (subscriptions != null) { + Subscription sub; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; - sub.publish(message); } } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message); + Subscription sub; for (int i = 0; i < deadSubscriptions.length; i++) { sub = deadSubscriptions[i]; - sub.publish(deadMessage); } } } } - /** - * @return true if subscriptions were published - */ - public final boolean publishExactAndSuper(Object message) throws Throwable { + public final void publishExactAndSuper(final Object message) throws Throwable { final Class messageClass = message.getClass(); final boolean isArray = messageClass.isArray(); - final Subscription[] subscriptions = getSubscriptions(messageClass, isArray); - Subscription sub; + final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass, isArray); // can return null // Run subscriptions if (subscriptions != null) { + Subscription sub; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; sub.publish(message); } - - return true; } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message); - return false; + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } } - /** - * @return true if subscriptions were published - */ - public boolean publishAll(Object message) throws Throwable { + public final void publishAll(final Object message) throws Throwable { final Class messageClass = message.getClass(); final boolean isArray = messageClass.isArray(); - final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); - Subscription sub; + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + + final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass, isArray); // can return null + + lock.unlockRead(stamp); + // Run subscriptions if (subscriptions != null) { + Subscription sub; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; sub.publish(message); @@ -516,52 +494,65 @@ public class SubscriptionManager { // publish to var arg, only if not already an array if (varArgPossibility.get() && !isArray) { -// StampedLock lock = this.lock; -// long stamp = lock.readLock(); + stamp = lock.readLock(); + final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); // can return null + lock.unlockRead(stamp); - final ArrayList varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); - - if (varArgSubscriptions != null && !varArgSubscriptions.isEmpty()) { + if (varArgSubscriptions != null) { + final int length = varArgSubscriptions.length; Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message; - Iterator iterator; - for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) { - sub = iterator.next(); - sub.publish(asArray); + for (int i = 0; i < length; i++) { + sub = varArgSubscriptions[i]; + sub.publish(message); } + stamp = lock.readLock(); // now publish array based superClasses (but only if those ALSO accept vararg) - final Collection varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); - if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { - for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) { - sub = iterator.next(); + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass); + lock.unlockRead(stamp); + if (varArgSuperSubscriptions != null) { + for (int i = 0; i < length; i++) { + sub = varArgSuperSubscriptions[i]; sub.publish(asArray); } } } else { + stamp = lock.readLock(); + // now publish array based superClasses (but only if those ALSO accept vararg) - final Collection varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); - if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass); + lock.unlockRead(stamp); + + if (varArgSuperSubscriptions != null) { Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message; - Iterator iterator; - for (iterator = varArgSuperSubscriptions.iterator(); iterator.hasNext(); ) { - sub = iterator.next(); - + for (int i = 0; i < varArgSuperSubscriptions.length; i++) { + sub = varArgSuperSubscriptions[i]; sub.publish(asArray); } } } } - - return true; + return; } - return false; + // only get here if there were no other subscriptions + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java index 921e2c9..48ed1a3 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionUtils.java @@ -143,7 +143,7 @@ public class SubscriptionUtils { * * @return CAN NOT RETURN NULL */ - public final ArrayList getSuperSubscriptions(final Class clazz, boolean isArray) { + public final ArrayList getSuperSubscriptions(final Class clazz, final boolean isArray) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.superClassSubscriptions; diff --git a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java index bffbc90..bffc77a 100644 --- a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -38,7 +38,7 @@ public class SubscriptionValidator extends AssertSupport { // we split subs + superSubs into TWO calls. Collection collection = new ArrayDeque(8); - Subscription[] subscriptions = manager.getSubscriptions(messageType, messageType.isArray()); + Subscription[] subscriptions = manager.getSubscriptionsExactAndSuper(messageType, messageType.isArray()); if (subscriptions != null) { collection.addAll(Arrays.asList(subscriptions)); }