diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index ac5ca81..985360f 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -118,7 +118,7 @@ class MessageBus implements IMessageBus { /** * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) */ - this.subscriptionManager = new SubscriptionManager(numberOfThreads); + this.subscriptionManager = new SubscriptionManager(); switch (publishMode) { case Exact: diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 96700a6..53b8ad9 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -69,15 +69,14 @@ class SubscriptionManager { private volatile IdentityMap, Subscription[]> subsSuperSingle; private volatile IdentityMap subsSuperMulti; - // In order to force the "Single writer principle" on subscribe & unsubscribe, they are within WRITE LOCKS. They could be dispatched - // to another thread, however we do NOT want them asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is - // called before. A WriteLock doesn't perform any better here than synchronized does. + // In order to force the "single writer principle" for subscribe & unsubscribe, they are within SYNCHRONIZED. + // + // These methods **COULD** be dispatched via another thread (so it's only one thread ever touching them), however we do NOT want them + // asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is called before. 'Synchronized' is good enough here. private final Object singleWriterLock = new Object(); - private final ClassTree> classTree; - private final ClassUtils classUtils; @@ -101,10 +100,11 @@ class SubscriptionManager { private static final AtomicReferenceFieldUpdater subsSuperMultiREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, IdentityMap.class, - "subsSuperSingle"); + "subsSuperMulti"); public - SubscriptionManager(final int numberOfThreads) { + SubscriptionManager() { + // not all platforms support ASM. ASM is our default, and is just-as-fast and directly invoking the method if (MessageBus.useAsmForDispatch) { this.subMaker = new SubMakerAsm(); } @@ -113,6 +113,8 @@ class SubscriptionManager { } classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR); + classTree = new ClassTree>(); + // modified ONLY during SUB/UNSUB nonListeners = new IdentityMap, Boolean>(16, LOAD_FACTOR); @@ -121,11 +123,9 @@ class SubscriptionManager { subsMulti = new IdentityMap(32, LOAD_FACTOR); + // modified during publication, however duplicates are OK, we we can "pretend" it's the same as the single-writer-principle subsSuperSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR); subsSuperMulti = new IdentityMap(32, LOAD_FACTOR); - - - classTree = new ClassTree>(); } /** @@ -153,7 +153,10 @@ class SubscriptionManager { this.subsPerListener.clear(); this.subsSingle.clear(); + this.subsMulti.clear(); + this.subsSuperSingle.clear(); + this.subsSuperMulti.clear(); this.classTree.clear(); this.classUtils.shutdown(); @@ -345,17 +348,6 @@ class SubscriptionManager { subsPerListener.put(listenerClass, subscriptions); - // dump the super subscriptions - IdentityMap, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this); - superSingleSubs.clear(); - subsSuperSingleREF.lazySet((this), superSingleSubs); - - IdentityMap superMultiSubs = subsSuperMultiREF.get(this); - superMultiSubs.clear(); - subsSuperMultiREF.lazySet((this), superMultiSubs); - - - // save this snapshot back to the original (single writer principle) subsSingleREF.lazySet(this, singleSubs); subsMultiREF.lazySet(this, multiSubs); @@ -363,9 +355,13 @@ class SubscriptionManager { // only dump the super subscriptions if it is a COMPLETELY NEW subscription. // If it's not new, then the hierarchy isn't changing for super subscriptions - final IdentityMap, Subscription[]> localSuperSubs = subsSuperSingleREF.get(this); - localSuperSubs.clear(); - subsSuperSingleREF.lazySet(this, localSuperSubs); + IdentityMap, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this); + superSingleSubs.clear(); + subsSuperSingleREF.lazySet(this, superSingleSubs); + + IdentityMap superMultiSubs = subsSuperMultiREF.get(this); + superMultiSubs.clear(); + subsSuperMultiREF.lazySet(this, superMultiSubs); } else { // subscriptions already exist and must only be updated diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 2dba1d7..c2ae238 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -220,7 +220,7 @@ public class SubscriptionManagerTest extends AssertSupport { ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class); final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final SubscriptionManager subscriptionManager = new SubscriptionManager(1); + final SubscriptionManager subscriptionManager = new SubscriptionManager(); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); @@ -247,7 +247,7 @@ public class SubscriptionManagerTest extends AssertSupport { private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) { final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final SubscriptionManager subscriptionManager = new SubscriptionManager(1); + final SubscriptionManager subscriptionManager = new SubscriptionManager(); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);