diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 377f1ed..2711344 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -6,6 +6,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; @@ -73,6 +74,9 @@ public class SubscriptionManager { private final Map, Collection> varArgSubscriptions; private final Map, Collection> varArgSuperClassSubscriptions; + // to keep track if we really need to clear our maps + private final AtomicBoolean superCheck = new AtomicBoolean(); + // stripe size of maps for concurrency private final int STRIPE_SIZE; @@ -142,9 +146,12 @@ public class SubscriptionManager { } // these are concurrent collections - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); + boolean compareAndSet = this.superCheck.compareAndSet(true, false); + if (compareAndSet) { + this.superClassSubscriptions.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); + } // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. @@ -254,10 +261,12 @@ public class SubscriptionManager { return; } - // these are concurrent collections - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); + boolean compareAndSet = this.superCheck.compareAndSet(true, false); + if (compareAndSet) { + this.superClassSubscriptions.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); + } // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency @@ -345,6 +354,7 @@ public class SubscriptionManager { // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions public Collection getVarArgSubscriptions(Class varArgType) { + this.superCheck.set(true); Map, Collection> local = this.varArgSubscriptions; // whenever our subscriptions change, this map is cleared. @@ -378,6 +388,7 @@ public class SubscriptionManager { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions public Collection getVarArgSuperSubscriptions(Class varArgType) { + this.superCheck.set(true); Map, Collection> local = this.varArgSuperClassSubscriptions; // whenever our subscriptions change, this map is cleared. @@ -421,6 +432,7 @@ public class SubscriptionManager { // ALSO checks to see if the superClass accepts subtypes. public final Collection getSuperSubscriptions(Class superType) { + this.superCheck.set(true); Map, Collection> local = this.superClassSubscriptions; // whenever our subscriptions change, this map is cleared. @@ -462,6 +474,7 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType1, Class superType2) { + this.superCheck.set(true); HashMapTree, Collection> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. @@ -535,6 +548,7 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { + this.superCheck.set(true); HashMapTree, Collection> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index abd932c..5312f76 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -92,6 +92,9 @@ public class Subscription { return this.listeners.size(); } + /** + * @return true if there were listeners for this publication, false if there was nothing + */ public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { Collection listeners = this.listeners; @@ -143,6 +146,9 @@ public class Subscription { return false; } + /** + * @return true if there were listeners for this publication, false if there was nothing + */ public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { Collection listeners = this.listeners; @@ -198,6 +204,9 @@ public class Subscription { return false; } + /** + * @return true if there were listeners for this publication, false if there was nothing + */ public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { Collection listeners = this.listeners; diff --git a/src/test/java/dorkbox/util/messagebus/SubscriptionManagerTest.java b/src/test/java/dorkbox/util/messagebus/SubscriptionManagerTest.java index 2081d19..fcdcd5e 100644 --- a/src/test/java/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/src/test/java/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -2,7 +2,6 @@ package dorkbox.util.messagebus; import org.junit.Test; -import dorkbox.util.messagebus.SubscriptionManager; import dorkbox.util.messagebus.common.AssertSupport; import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ListenerFactory; @@ -36,7 +35,6 @@ import dorkbox.util.messagebus.messages.StandardMessage; public class SubscriptionManagerTest extends AssertSupport { private static final int InstancesPerListener = 5000; - private static final int ConcurrentUnits = 1; @Test public void testIMessageListener(){ @@ -190,7 +188,6 @@ public class SubscriptionManagerTest extends AssertSupport { ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1); listeners.clear(); - validator.clear(); validator.validate(subscriptionManager); } diff --git a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java index 1118530..bac5d2e 100644 --- a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -26,11 +26,11 @@ public class SubscriptionValidator extends AssertSupport{ this.subscribedListener = subscribedListener; } - public Expectation listener(Class subscriber) { + public Expectation listener(Class subscriber){ return new Expectation(subscriber); } - private SubscriptionValidator expect(Class subscriber, Class messageType) { + private SubscriptionValidator expect(Class subscriber, Class messageType){ this.validations.add(new ValidationEntry(messageType, subscriber)); this.messageTypes.add(messageType); return this; @@ -38,8 +38,8 @@ public class SubscriptionValidator extends AssertSupport{ // match subscriptions with existing validation entries // for each tuple of subscriber and message type the specified number of listeners must exist - public void validate(SubscriptionManager manager) { - for (Class messageType : this.messageTypes) { + public void validate(SubscriptionManager manager){ + for (Class messageType : this.messageTypes){ Collection validationEntries = getEntries(messageType); // we split subs + superSubs into TWO calls. @@ -54,6 +54,8 @@ public class SubscriptionValidator extends AssertSupport{ } assertEquals(validationEntries.size(), collection.size()); + + for(ValidationEntry validationValidationEntry : validationEntries){ Subscription matchingSub = null; // one of the subscriptions must belong to the subscriber type @@ -70,15 +72,11 @@ public class SubscriptionValidator extends AssertSupport{ } - public void clear() { - this.validations.clear(); - } - - - private Collection getEntries(Class messageType) { + private Collection getEntries(Class messageType){ Collection matching = new LinkedList(); for (ValidationEntry validationValidationEntry : this.validations){ - if (validationValidationEntry.messageType.equals(messageType)) { + + if(validationValidationEntry.messageType.equals(messageType)) { matching.add(validationValidationEntry); } } @@ -86,16 +84,18 @@ public class SubscriptionValidator extends AssertSupport{ } - public class Expectation { - private Class listener; - private Expectation(Class listener) { + public class Expectation{ + + private Class listener; + + private Expectation(Class listener) { this.listener = listener; } - public SubscriptionValidator handles(Class ...messages){ - for(Class message : messages) { + public SubscriptionValidator handles(Class ...messages){ + for(Class message : messages) { expect(this.listener, message); } return SubscriptionValidator.this; @@ -103,12 +103,18 @@ public class SubscriptionValidator extends AssertSupport{ } private class ValidationEntry { - private Class subscriber; - private Class messageType; - private ValidationEntry(Class messageType, Class subscriber) { + + private Class subscriber; + + private Class messageType; + + private ValidationEntry(Class messageType, Class subscriber) { this.messageType = messageType; this.subscriber = subscriber; } + + } + }