diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 25b4864..43434fe 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -39,19 +39,19 @@ public class MultiMBassador implements IMessageBus { * removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the * system. By default, this is FALSE, to support subTypes and VarArg matching. */ - private final boolean forceExactMatches = false; + private final boolean forceExactMatches; /** - * Notifies the consumers that the messagebus is shutting down. + * Notifies the consumers during shutdown, that it's on purpose. */ private volatile boolean shuttingDown; /** - * By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages + * By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages */ public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()); + this(Runtime.getRuntime().availableProcessors()/2); } /** @@ -73,6 +73,7 @@ public class MultiMBassador implements IMessageBus { numberOfThreads = 2; // at LEAST 2 threads } numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads); + this.forceExactMatches = forceExactMatches; this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads); @@ -163,7 +164,7 @@ public class MultiMBassador implements IMessageBus { } @Override - public boolean hasPendingMessages() { + public final boolean hasPendingMessages() { return this.dispatchQueue.hasPendingMessages(); } @@ -177,7 +178,7 @@ public class MultiMBassador implements IMessageBus { } @Override - public void publish(Object message) { + public void publish(final Object message) { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); @@ -272,29 +273,39 @@ public class MultiMBassador implements IMessageBus { } @Override - public void publish(Object message1, Object message2) { + public void publish(final Object message1, final Object message2) { SubscriptionManager manager = this.subscriptionManager; Class messageClass1 = message1.getClass(); Class messageClass2 = message2.getClass(); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); + StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); boolean subsPublished = false; + ISetEntry current; + Subscription sub; // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { - for (Subscription sub : subscriptions) { + current = subscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, message1, message2); } } if (!this.forceExactMatches) { - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); + StrongConcurrentSetV8 superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // now get superClasses if (superSubscriptions != null && !superSubscriptions.isEmpty()) { - for (Subscription sub : superSubscriptions) { + current = superSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, message1, message2); } @@ -304,19 +315,23 @@ public class MultiMBassador implements IMessageBus { if (messageClass1 == messageClass2 && !messageClass1.isArray()) { Object[] asArray = null; - Collection varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray[0] = message1; asArray[1] = message2; - for (Subscription sub : varargSubscriptions) { + current = varargSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } } - Collection varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (asArray == null) { @@ -325,7 +340,11 @@ public class MultiMBassador implements IMessageBus { asArray[1] = message2; } - for (Subscription sub : varargSuperSubscriptions) { + current = varargSuperSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } @@ -336,10 +355,15 @@ public class MultiMBassador implements IMessageBus { if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) - Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message1, message2); - for (Subscription sub : deadSubscriptions) { + + current = deadSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types sub.publishToSubscription(this, deadMessage); } @@ -348,19 +372,26 @@ public class MultiMBassador implements IMessageBus { } @Override - public void publish(Object message1, Object message2, Object message3) { + public void publish(final Object message1, final Object message2, final Object message3) { SubscriptionManager manager = this.subscriptionManager; Class messageClass1 = message1.getClass(); Class messageClass2 = message2.getClass(); Class messageClass3 = message3.getClass(); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); + StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); boolean subsPublished = false; + ISetEntry current; + Subscription sub; + // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { - for (Subscription sub : subscriptions) { + current = subscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, message1, message2, message3); } @@ -368,10 +399,14 @@ public class MultiMBassador implements IMessageBus { if (!this.forceExactMatches) { - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); + StrongConcurrentSetV8 superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // now get superClasses if (superSubscriptions != null && !superSubscriptions.isEmpty()) { - for (Subscription sub : superSubscriptions) { + current = superSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types sub.publishToSubscription(this, message1, message2, message3); } @@ -380,20 +415,24 @@ public class MultiMBassador implements IMessageBus { // publish to var arg, only if not already an array if (messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) { Object[] asArray = null; - Collection varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray[0] = message1; asArray[1] = message2; asArray[2] = message3; - for (Subscription sub : varargSubscriptions) { + current = varargSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } } - Collection varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (asArray == null) { @@ -403,7 +442,11 @@ public class MultiMBassador implements IMessageBus { asArray[2] = message3; } - for (Subscription sub : varargSuperSubscriptions) { + current = varargSuperSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } @@ -414,10 +457,15 @@ public class MultiMBassador implements IMessageBus { if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) - Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - for (Subscription sub : deadSubscriptions) { + + current = deadSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + // this catches all exception types sub.publishToSubscription(this, deadMessage); } @@ -430,12 +478,14 @@ public class MultiMBassador implements IMessageBus { if (message != null) { try { this.dispatchQueue.transfer(message); - } catch (InterruptedException e) { + } catch (Exception e) { handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); } + } else { + throw new NullPointerException("Message cannot be null."); } } @@ -444,12 +494,14 @@ public class MultiMBassador implements IMessageBus { if (message1 != null && message2 != null) { try { this.dispatchQueue.transfer(message1, message2); - } catch (InterruptedException e) { + } catch (Exception e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") .setCause(e) .setPublishedObject(message1, message2)); } + } else { + throw new NullPointerException("Messages cannot be null."); } } @@ -458,12 +510,14 @@ public class MultiMBassador implements IMessageBus { if (message1 != null || message2 != null | message3 != null) { try { this.dispatchQueue.transfer(message1, message2, message3); - } catch (InterruptedException e) { + } catch (Exception e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") .setCause(e) .setPublishedObject(message1, message2, message3)); } + } else { + throw new NullPointerException("Messages cannot be null."); } } } diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index a846747..5275d9b 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -52,7 +52,7 @@ public class SubscriptionManager { // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time private final ConcurrentMap, StrongConcurrentSetV8> subscriptionsPerMessageSingle; - private final HashMapTree, Collection> subscriptionsPerMessageMulti; + private final HashMapTree, StrongConcurrentSetV8> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing @@ -67,7 +67,7 @@ public class SubscriptionManager { // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one private final Map, StrongConcurrentSetV8> superClassSubscriptions; - private final HashMapTree, Collection> superClassSubscriptionsMulti; + private final HashMapTree, StrongConcurrentSetV8> superClassSubscriptionsMulti; private final Map, StrongConcurrentSetV8> varArgSubscriptions; private final Map, StrongConcurrentSetV8> varArgSuperClassSubscriptions; @@ -87,7 +87,7 @@ public class SubscriptionManager { this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, SubscriptionManager.LOAD_FACTOR); this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, Collection>(4, SubscriptionManager.LOAD_FACTOR); + this.subscriptionsPerMessageMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, 1); @@ -101,7 +101,7 @@ public class SubscriptionManager { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers this.superClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.superClassSubscriptionsMulti = new HashMapTree, Collection>(4, SubscriptionManager.LOAD_FACTOR); + this.superClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers @@ -340,13 +340,13 @@ public class SubscriptionManager { } // CAN RETURN NULL - public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { + public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType1, Class messageType2) { return this.subscriptionsPerMessageMulti.get(messageType1, messageType2); } // CAN RETURN NULL - public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { + public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); } @@ -506,12 +506,12 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. - public Collection getSuperSubscriptions(Class superType1, Class superType2) { - HashMapTree, Collection> local = this.superClassSubscriptionsMulti; + public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2) { + HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, Collection> subsPerTypeLeaf = local.getLeaf(superType1, superType2); - Collection subsPerType = null; + HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(superType1, superType2); + StrongConcurrentSetV8 subsPerType = null; // we DO NOT care about duplicate, because the answers will be the same if (subsPerTypeLeaf != null) { @@ -521,12 +521,24 @@ public class SubscriptionManager { subsPerType = new StrongConcurrentSetV8(16, LOAD_FACTOR, this.STRIPE_SIZE); // whenever our subscriptions change, this map is cleared. - Collection> types1 = this.superClassesCache.get(superType1); - Collection> types2 = this.superClassesCache.get(superType2); + StrongConcurrentSetV8> types1 = this.superClassesCache.get(superType1); + StrongConcurrentSetV8> types2 = this.superClassesCache.get(superType2); + + StrongConcurrentSetV8 subs; + HashMapTree, StrongConcurrentSetV8> leaf1; + HashMapTree, StrongConcurrentSetV8> leaf2; + +// ISetEntry> current1; +// Class eventSuperType1; +// +// current1 = types1.head; +// while (current1 != null) { +// eventSuperType1 = current1.getValue(); +// current1 = current1.next(); +// +// } + - Collection subs; - HashMapTree, Collection> leaf1; - HashMapTree, Collection> leaf2; Iterator> iterator1 = new SuperClassIterator(superType1, types1); Iterator> iterator2; @@ -567,7 +579,7 @@ public class SubscriptionManager { } } - Collection putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); + StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); if (putIfAbsent != null) { // someone beat us subsPerType = putIfAbsent; @@ -579,12 +591,12 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. - public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { - HashMapTree, Collection> local = this.superClassSubscriptionsMulti; + public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { + HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, Collection> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); - Collection subsPerType; + HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); + StrongConcurrentSetV8 subsPerType; // we DO NOT care about duplicate, because the answers will be the same @@ -599,9 +611,9 @@ public class SubscriptionManager { subsPerType = new StrongConcurrentSetV8(16, LOAD_FACTOR, this.STRIPE_SIZE); Collection subs; - HashMapTree, Collection> leaf1; - HashMapTree, Collection> leaf2; - HashMapTree, Collection> leaf3; + HashMapTree, StrongConcurrentSetV8> leaf1; + HashMapTree, StrongConcurrentSetV8> leaf2; + HashMapTree, StrongConcurrentSetV8> leaf3; Iterator> iterator1 = new SuperClassIterator(superType1, types1); Iterator> iterator2; @@ -658,7 +670,7 @@ public class SubscriptionManager { } } - Collection putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); + StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); if (putIfAbsent != null) { // someone beat us subsPerType = putIfAbsent;