Code polish and comments
This commit is contained in:
parent
7e262d1f0c
commit
969e21d762
|
@ -118,7 +118,7 @@ class MessageBus implements IMessageBus {
|
||||||
/**
|
/**
|
||||||
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
|
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
|
||||||
*/
|
*/
|
||||||
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
|
this.subscriptionManager = new SubscriptionManager();
|
||||||
|
|
||||||
switch (publishMode) {
|
switch (publishMode) {
|
||||||
case Exact:
|
case Exact:
|
||||||
|
|
|
@ -69,15 +69,14 @@ class SubscriptionManager {
|
||||||
private volatile IdentityMap<Class<?>, Subscription[]> subsSuperSingle;
|
private volatile IdentityMap<Class<?>, Subscription[]> subsSuperSingle;
|
||||||
private volatile IdentityMap<MultiClass, Subscription[]> subsSuperMulti;
|
private volatile IdentityMap<MultiClass, Subscription[]> subsSuperMulti;
|
||||||
|
|
||||||
// In order to force the "Single writer principle" on subscribe & unsubscribe, they are within WRITE LOCKS. They could be dispatched
|
// In order to force the "single writer principle" for subscribe & unsubscribe, they are within SYNCHRONIZED.
|
||||||
// to another thread, however we do NOT want them asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is
|
//
|
||||||
// called before. A WriteLock doesn't perform any better here than synchronized does.
|
// These methods **COULD** be dispatched via another thread (so it's only one thread ever touching them), however we do NOT want them
|
||||||
|
// asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is called before. 'Synchronized' is good enough here.
|
||||||
private final Object singleWriterLock = new Object();
|
private final Object singleWriterLock = new Object();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private final ClassTree<Class<?>> classTree;
|
private final ClassTree<Class<?>> classTree;
|
||||||
|
|
||||||
private final ClassUtils classUtils;
|
private final ClassUtils classUtils;
|
||||||
|
|
||||||
|
|
||||||
|
@ -101,10 +100,11 @@ class SubscriptionManager {
|
||||||
private static final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsSuperMultiREF =
|
private static final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsSuperMultiREF =
|
||||||
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
||||||
IdentityMap.class,
|
IdentityMap.class,
|
||||||
"subsSuperSingle");
|
"subsSuperMulti");
|
||||||
|
|
||||||
public
|
public
|
||||||
SubscriptionManager(final int numberOfThreads) {
|
SubscriptionManager() {
|
||||||
|
// not all platforms support ASM. ASM is our default, and is just-as-fast and directly invoking the method
|
||||||
if (MessageBus.useAsmForDispatch) {
|
if (MessageBus.useAsmForDispatch) {
|
||||||
this.subMaker = new SubMakerAsm();
|
this.subMaker = new SubMakerAsm();
|
||||||
}
|
}
|
||||||
|
@ -113,6 +113,8 @@ class SubscriptionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
|
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
|
||||||
|
classTree = new ClassTree<Class<?>>();
|
||||||
|
|
||||||
|
|
||||||
// modified ONLY during SUB/UNSUB
|
// modified ONLY during SUB/UNSUB
|
||||||
nonListeners = new IdentityMap<Class<?>, Boolean>(16, LOAD_FACTOR);
|
nonListeners = new IdentityMap<Class<?>, Boolean>(16, LOAD_FACTOR);
|
||||||
|
@ -121,11 +123,9 @@ class SubscriptionManager {
|
||||||
subsMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
|
subsMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
|
||||||
|
|
||||||
|
|
||||||
|
// modified during publication, however duplicates are OK, we we can "pretend" it's the same as the single-writer-principle
|
||||||
subsSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
subsSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||||
subsSuperMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
|
subsSuperMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
|
||||||
|
|
||||||
|
|
||||||
classTree = new ClassTree<Class<?>>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -153,7 +153,10 @@ class SubscriptionManager {
|
||||||
this.subsPerListener.clear();
|
this.subsPerListener.clear();
|
||||||
|
|
||||||
this.subsSingle.clear();
|
this.subsSingle.clear();
|
||||||
|
this.subsMulti.clear();
|
||||||
|
|
||||||
this.subsSuperSingle.clear();
|
this.subsSuperSingle.clear();
|
||||||
|
this.subsSuperMulti.clear();
|
||||||
|
|
||||||
this.classTree.clear();
|
this.classTree.clear();
|
||||||
this.classUtils.shutdown();
|
this.classUtils.shutdown();
|
||||||
|
@ -345,17 +348,6 @@ class SubscriptionManager {
|
||||||
subsPerListener.put(listenerClass, subscriptions);
|
subsPerListener.put(listenerClass, subscriptions);
|
||||||
|
|
||||||
|
|
||||||
// dump the super subscriptions
|
|
||||||
IdentityMap<Class<?>, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this);
|
|
||||||
superSingleSubs.clear();
|
|
||||||
subsSuperSingleREF.lazySet((this), superSingleSubs);
|
|
||||||
|
|
||||||
IdentityMap<MultiClass, Subscription[]> superMultiSubs = subsSuperMultiREF.get(this);
|
|
||||||
superMultiSubs.clear();
|
|
||||||
subsSuperMultiREF.lazySet((this), superMultiSubs);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// save this snapshot back to the original (single writer principle)
|
// save this snapshot back to the original (single writer principle)
|
||||||
subsSingleREF.lazySet(this, singleSubs);
|
subsSingleREF.lazySet(this, singleSubs);
|
||||||
subsMultiREF.lazySet(this, multiSubs);
|
subsMultiREF.lazySet(this, multiSubs);
|
||||||
|
@ -363,9 +355,13 @@ class SubscriptionManager {
|
||||||
|
|
||||||
// only dump the super subscriptions if it is a COMPLETELY NEW subscription.
|
// only dump the super subscriptions if it is a COMPLETELY NEW subscription.
|
||||||
// If it's not new, then the hierarchy isn't changing for super subscriptions
|
// If it's not new, then the hierarchy isn't changing for super subscriptions
|
||||||
final IdentityMap<Class<?>, Subscription[]> localSuperSubs = subsSuperSingleREF.get(this);
|
IdentityMap<Class<?>, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this);
|
||||||
localSuperSubs.clear();
|
superSingleSubs.clear();
|
||||||
subsSuperSingleREF.lazySet(this, localSuperSubs);
|
subsSuperSingleREF.lazySet(this, superSingleSubs);
|
||||||
|
|
||||||
|
IdentityMap<MultiClass, Subscription[]> superMultiSubs = subsSuperMultiREF.get(this);
|
||||||
|
superMultiSubs.clear();
|
||||||
|
subsSuperMultiREF.lazySet(this, superMultiSubs);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// subscriptions already exist and must only be updated
|
// subscriptions already exist and must only be updated
|
||||||
|
|
|
@ -220,7 +220,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
||||||
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
|
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
|
||||||
|
|
||||||
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
|
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
|
||||||
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
|
final SubscriptionManager subscriptionManager = new SubscriptionManager();
|
||||||
|
|
||||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
||||||
|
|
||||||
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
|
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
|
||||||
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
|
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
|
||||||
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
|
final SubscriptionManager subscriptionManager = new SubscriptionManager();
|
||||||
|
|
||||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user