Really good pub throughput/latency. sub/unsub need work

This commit is contained in:
nathan 2015-06-05 15:53:22 +02:00
parent bd25415dff
commit 3acd8f934f
8 changed files with 150 additions and 147 deletions

View File

@ -50,7 +50,7 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
this(Mode.Exact, numberOfThreads); this(Mode.ExactWithSuperTypes, numberOfThreads);
} }
/** /**
@ -109,14 +109,17 @@ public class MultiMBassador implements IMessageBus {
while (true) { while (true) {
IN_QUEUE.take(node); IN_QUEUE.take(node);
switch (node.messageType) { switch (node.messageType) {
case 1: case 1: {
publish(node.item1); publish(node.item1);
continue; break;
case 2: }
case 2: {
publish(node.item1, node.item2); publish(node.item1, node.item2);
continue; break;
case 3: }
default: {
publish(node.item1, node.item2, node.item3); publish(node.item1, node.item2, node.item3);
}
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -126,15 +129,15 @@ public class MultiMBassador implements IMessageBus {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1)); .setCause(e).setPublishedObject(node.item1));
continue; break;
} }
case 2: { case 2: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1, node.item2)); .setCause(e).setPublishedObject(node.item1, node.item2));
continue; break;
} }
case 3: { default: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e) .setCause(e)

View File

@ -64,8 +64,9 @@ public class MessageHandler {
} }
} }
MessageHandler[] array = finalMethods.toArray(new MessageHandler[finalMethods.size()]); final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()];
return array; finalMethods.toArray(messageHandlers);
return messageHandlers;
} }
private final MethodAccess handler; private final MethodAccess handler;

View File

@ -1,14 +1,14 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement; import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/** /**
* @author bennidi * @author bennidi
* Date: 2/16/12 * Date: 2/16/12
@ -22,7 +22,9 @@ public class ReflectionUtils {
ArrayList<Method> methods = new ArrayList<Method>(); ArrayList<Method> methods = new ArrayList<Method>();
getMethods(target, methods); getMethods(target, methods);
return methods.toArray(new Method[methods.size()]); final Method[] array = new Method[methods.size()];
methods.toArray(array);
return array;
} }
private static void getMethods(Class<?> target, ArrayList<Method> methods) { private static void getMethods(Class<?> target, ArrayList<Method> methods) {
@ -81,7 +83,9 @@ public class ReflectionUtils {
collectInterfaces( from, superclasses ); collectInterfaces( from, superclasses );
} }
return superclasses.toArray(new Class<?>[superclasses.size()]); final Class<?>[] classes = new Class<?>[superclasses.size()];
superclasses.toArray(classes);
return classes;
} }
public static void collectInterfaces( Class<?> from, Collection<Class<?>> accumulator ) { public static void collectInterfaces( Class<?> from, Collection<Class<?>> accumulator ) {

View File

@ -57,7 +57,8 @@ public class SuperClassUtils {
} }
} }
classes = newList.toArray(new Class<?>[newList.size()]); classes = new Class<?>[newList.size()];
newList.toArray(classes);
local.put(clazz, classes); local.put(clazz, classes);
} }

View File

@ -52,7 +52,7 @@ public class VarArgUtils {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public ArrayList<Subscription> getVarArgSubscriptions(Class<?> messageClass) { public Subscription[] getVarArgSubscriptions(Class<?> messageClass) {
Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions; Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions;
ArrayList<Subscription> varArgSubs = local.get(messageClass); ArrayList<Subscription> varArgSubs = local.get(messageClass);
@ -79,7 +79,10 @@ public class VarArgUtils {
} }
} }
return varArgSubs; // return varArgSubs;
return null;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
@ -115,10 +118,10 @@ public class VarArgUtils {
// CAN NOT RETURN NULL // CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) { public Subscription[] getVarArgSuperSubscriptions(Class<?> messageClass) {
// // whenever our subscriptions change, this map is cleared. // // whenever our subscriptions change, this map is cleared.
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions; // ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
// //

View File

@ -5,7 +5,6 @@ import dorkbox.util.messagebus.common.*;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -28,9 +27,7 @@ import java.util.concurrent.locks.StampedLock;
* @author dorkbox, llc * @author dorkbox, llc
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class SubscriptionManager { public final class SubscriptionManager {
private static final Subscription[] EMPTY = new Subscription[0];
private static final float LOAD_FACTOR = 0.8F; private static final float LOAD_FACTOR = 0.8F;
private final SubscriptionUtils utils; private final SubscriptionUtils utils;
@ -39,7 +36,7 @@ public class SubscriptionManager {
private final Map<Class<?>, Boolean> nonListeners; private final Map<Class<?>, Boolean> nonListeners;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private AtomicBoolean varArgPossibility = new AtomicBoolean(false); private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
@ -54,13 +51,9 @@ public class SubscriptionManager {
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener; private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final SuperClassUtils superClass;
private final VarArgUtils varArgUtils; private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock(); private final StampedLock lock = new StampedLock();
// private final ReadWriteLock lock = new ReentrantReadWriteLock();
public SubscriptionManager(int numberOfThreads) { public SubscriptionManager(int numberOfThreads) {
float loadFactor = SubscriptionManager.LOAD_FACTOR; float loadFactor = SubscriptionManager.LOAD_FACTOR;
@ -73,10 +66,10 @@ public class SubscriptionManager {
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor); this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
// only used during SUB/UNSUB // only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(); this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(32, LOAD_FACTOR, 1);
} }
this.superClass = new SuperClassUtils(loadFactor, 1); final SuperClassUtils superClass = new SuperClassUtils(loadFactor, 1);
this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor,
numberOfThreads); numberOfThreads);
@ -85,23 +78,24 @@ public class SubscriptionManager {
this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads);
} }
public void shutdown() { public final void shutdown() {
this.nonListeners.clear(); this.nonListeners.clear();
this.subscriptionsPerMessageSingle.clear(); this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear(); this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear(); this.subscriptionsPerListener.clear();
this.utils.shutdown();
clearConcurrentCollections(); clearConcurrentCollections();
this.utils.shutdown();
} }
public void subscribe(Object listener) { public final void subscribe(final Object listener) {
if (listener == null) { if (listener == null) {
return; return;
} }
Class<?> listenerClass = listener.getClass(); final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) { if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers // early reject of known classes that do not define message handlers
@ -118,8 +112,8 @@ public class SubscriptionManager {
// it is important to note that this section CAN be repeated, however the write lock is gained before // it is important to note that this section CAN be repeated, however the write lock is gained before
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized. // anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
int handlersSize = messageHandlers.length; final int handlersSize = messageHandlers.length;
// remember the class as non listening class if no handlers are found // remember the class as non listening class if no handlers are found
if (handlersSize == 0) { if (handlersSize == 0) {
@ -131,7 +125,7 @@ public class SubscriptionManager {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle; final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti; final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>(handlersSize); final Subscription[] subsPerListener = new Subscription[handlersSize];
Collection<Subscription> subsForPublication; Collection<Subscription> subsForPublication;
// create the subscription // create the subscription
@ -145,15 +139,15 @@ public class SubscriptionManager {
subscription = new Subscription(messageHandler); subscription = new Subscription(messageHandler);
subscription.subscribe(listener); subscription.subscribe(listener);
subsPerListener.add(subscription); // activates this sub for sub/unsub subsPerListener[i] = subscription; // activates this sub for sub/unsub
} }
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes. // of the huge number of reads compared to writes.
StampedLock lock = this.lock; final StampedLock lock = this.lock;
long stamp = lock.writeLock(); final long stamp = lock.writeLock();
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
subscriptions = subsPerListenerMap.get(listenerClass); subscriptions = subsPerListenerMap.get(listenerClass);
@ -164,7 +158,7 @@ public class SubscriptionManager {
final SubscriptionUtils utils = this.utils; final SubscriptionUtils utils = this.utils;
for (int i = 0; i < handlersSize; i++) { for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener.get(i); subscription = subsPerListener[i];
// now add this subscription to each of the handled types // now add this subscription to each of the handled types
subsForPublication = subscription subsForPublication = subscription
@ -174,12 +168,13 @@ public class SubscriptionManager {
subsForPublication.add(subscription); // activates this sub for publication subsForPublication.add(subscription); // activates this sub for publication
} }
subsPerListenerMap.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()])); subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp); lock.unlockWrite(stamp);
return; return;
} }
else { else {
// continue to subscription
lock.unlockWrite(stamp); lock.unlockWrite(stamp);
} }
} }
@ -194,12 +189,12 @@ public class SubscriptionManager {
} }
} }
public final void unsubscribe(Object listener) { public final void unsubscribe(final Object listener) {
if (listener == null) { if (listener == null) {
return; return;
} }
Class<?> listenerClass = listener.getClass(); final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) { if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers // early reject of known classes that do not define message handlers
return; return;
@ -208,7 +203,7 @@ public class SubscriptionManager {
// these are concurrent collections // these are concurrent collections
clearConcurrentCollections(); clearConcurrentCollections();
Subscription[] subscriptions = getListenerSubs(listenerClass); final Subscription[] subscriptions = getListenerSubs(listenerClass);
if (subscriptions != null) { if (subscriptions != null) {
Subscription subscription; Subscription subscription;
@ -225,94 +220,75 @@ public class SubscriptionManager {
this.varArgUtils.clear(); this.varArgUtils.clear();
} }
private Subscription[] getListenerSubs(Class<?> listenerClass) { private Subscription[] getListenerSubs(final Class<?> listenerClass) {
Subscription[] subscriptions;
StampedLock lock = this.lock; final StampedLock lock = this.lock;
long stamp = lock.readLock(); final long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
subscriptions = this.subscriptionsPerListener.get(listenerClass); final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
lock.unlockRead(stamp); lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions; return subscriptions;
} }
// retrieves all of the appropriate subscriptions for the message type // can return null
public final Subscription[] getSubscriptionsExact(final Class<?> messageClass) { public final Subscription[] getSubscriptionsExact(final Class<?> messageClass) {
ArrayList<Subscription> collection; final StampedLock lock = this.lock;
Subscription[] subscriptions; final long stamp = lock.readLock();
StampedLock lock = this.lock; final ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass);
long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass);
//
if (collection != null) { if (collection != null) {
subscriptions = collection.toArray(new Subscription[collection.size()]); final Subscription[] subscriptions = new Subscription[collection.size()];
} collection.toArray(subscriptions);
else {
// subscriptions = EMPTY; lock.unlockRead(stamp);
subscriptions = null; return subscriptions;
} }
lock.unlockRead(stamp); lock.unlockRead(stamp);
// readLock.unlock(); return null;
return subscriptions;
} }
// never return null // can return null
// used by unit tests only // public because it is also used by unit tests
public final Subscription[] getSubscriptions(final Class<?> messageClass, boolean isArray) { public final Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass, final boolean isArray) {
StampedLock lock = this.lock; final StampedLock lock = this.lock;
long stamp = lock.readLock(); final long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass, isArray);
lock.unlockRead(stamp); lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions; return subscriptions;
} }
// never return null // can return null
private Subscription[] getSubscriptions_NL(final Class<?> messageClass, boolean isArray) { private Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass, final boolean isArray) {
ArrayList<Subscription> collection; ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now publish superClasses // now publish superClasses
ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null final ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null
if (collection != null) { if (collection != null) {
collection = new ArrayList<Subscription>(collection); collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) { if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions); collection.addAll(superSubscriptions);
} }
} }
else { else if (!superSubscriptions.isEmpty()) {
if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions; collection = superSubscriptions;
}
} }
final Subscription[] subscriptions;
if (collection != null) { if (collection != null) {
subscriptions = collection.toArray(new Subscription[collection.size()]); final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
} }
else { else {
subscriptions = null; return null;
} }
return subscriptions;
} }
@ -404,12 +380,6 @@ public class SubscriptionManager {
} }
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) {
return this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
}
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
@ -445,70 +415,78 @@ public class SubscriptionManager {
return this.utils.getSuperSubscriptions(superType1, superType2, superType3); return this.utils.getSuperSubscriptions(superType1, superType2, superType3);
} }
public final void publishExact(Object message) throws Throwable { public final void publishExact(final Object message) throws Throwable {
final Class<?> messageClass = message.getClass(); final Class<?> messageClass = message.getClass();
final Subscription[] subscriptions = getSubscriptionsExact(messageClass); final Subscription[] subscriptions = getSubscriptionsExact(messageClass); // can return null
Subscription sub;
// Run subscriptions // Run subscriptions
if (subscriptions != null) { if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i]; sub = subscriptions[i];
sub.publish(message); sub.publish(message);
} }
} }
else { else {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) { if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message); final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) { for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i]; sub = deadSubscriptions[i];
sub.publish(deadMessage); sub.publish(deadMessage);
} }
} }
} }
} }
/** public final void publishExactAndSuper(final Object message) throws Throwable {
* @return true if subscriptions were published
*/
public final boolean publishExactAndSuper(Object message) throws Throwable {
final Class<?> messageClass = message.getClass(); final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray(); final boolean isArray = messageClass.isArray();
final Subscription[] subscriptions = getSubscriptions(messageClass, isArray); final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass, isArray); // can return null
Subscription sub;
// Run subscriptions // Run subscriptions
if (subscriptions != null) { if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i]; sub = subscriptions[i];
sub.publish(message); sub.publish(message);
} }
return true;
} }
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
return false; Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} }
/** public final void publishAll(final Object message) throws Throwable {
* @return true if subscriptions were published
*/
public boolean publishAll(Object message) throws Throwable {
final Class<?> messageClass = message.getClass(); final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray(); final boolean isArray = messageClass.isArray();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray); final StampedLock lock = this.lock;
Subscription sub; long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass, isArray); // can return null
lock.unlockRead(stamp);
// Run subscriptions // Run subscriptions
if (subscriptions != null) { if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i]; sub = subscriptions[i];
sub.publish(message); sub.publish(message);
@ -516,52 +494,65 @@ public class SubscriptionManager {
// publish to var arg, only if not already an array // publish to var arg, only if not already an array
if (varArgPossibility.get() && !isArray) { if (varArgPossibility.get() && !isArray) {
// StampedLock lock = this.lock; stamp = lock.readLock();
// long stamp = lock.readLock(); final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); // can return null
lock.unlockRead(stamp);
final ArrayList<Subscription> varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
if (varArgSubscriptions != null && !varArgSubscriptions.isEmpty()) {
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message; asArray[0] = message;
Iterator<Subscription> iterator;
for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) {
sub = iterator.next();
sub.publish(asArray); for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub.publish(message);
} }
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg) // now publish array based superClasses (but only if those ALSO accept vararg)
final Collection<Subscription> varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { lock.unlockRead(stamp);
for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) {
sub = iterator.next();
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray); sub.publish(asArray);
} }
} }
} }
else { else {
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg) // now publish array based superClasses (but only if those ALSO accept vararg)
final Collection<Subscription> varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass); final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) { lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1); Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message; asArray[0] = message;
Iterator<Subscription> iterator; for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
for (iterator = varArgSuperSubscriptions.iterator(); iterator.hasNext(); ) { sub = varArgSuperSubscriptions[i];
sub = iterator.next();
sub.publish(asArray); sub.publish(asArray);
} }
} }
} }
} }
return;
return true;
} }
return false; // only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
} }
} }

View File

@ -143,7 +143,7 @@ public class SubscriptionUtils {
* *
* @return CAN NOT RETURN NULL * @return CAN NOT RETURN NULL
*/ */
public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, boolean isArray) { public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, final boolean isArray) {
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions; final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;

View File

@ -38,7 +38,7 @@ public class SubscriptionValidator extends AssertSupport {
// we split subs + superSubs into TWO calls. // we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8); Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Subscription[] subscriptions = manager.getSubscriptions(messageType, messageType.isArray()); Subscription[] subscriptions = manager.getSubscriptionsExactAndSuper(messageType, messageType.isArray());
if (subscriptions != null) { if (subscriptions != null) {
collection.addAll(Arrays.asList(subscriptions)); collection.addAll(Arrays.asList(subscriptions));
} }