This commit is contained in:
nathan 2015-02-28 01:30:51 +01:00
parent 7d410e78be
commit e257aced31

View File

@ -6,7 +6,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
@ -74,9 +73,6 @@ public class SubscriptionManager {
private final Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
private final Map<Class<?>, Collection<Subscription>> varArgSuperClassSubscriptions;
// to keep track if we really need to clear our maps
private final AtomicBoolean superCheck = new AtomicBoolean();
// stripe size of maps for concurrency
private final int STRIPE_SIZE;
@ -146,12 +142,9 @@ public class SubscriptionManager {
}
// these are concurrent collections
boolean compareAndSet = this.superCheck.compareAndSet(true, false);
if (compareAndSet) {
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
}
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
@ -169,11 +162,9 @@ public class SubscriptionManager {
this.nonListeners.put(listenerClass, Boolean.TRUE);
return;
} else {
// really was null
subsPerListener = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1);
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
for (MessageHandler messageHandler : messageHandlers) {
Collection<Subscription> subsPerType = null;
@ -235,8 +226,9 @@ public class SubscriptionManager {
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
subsPerListener.add(subscription);
subsPerType.add(subscription);
subsPerListener.add(subscription); // activates this sub for sub/unsub
subsPerType.add(subscription); // activates this sub for publication
}
subsPerListener2.put(listenerClass, subsPerListener);
@ -261,21 +253,15 @@ public class SubscriptionManager {
return;
}
boolean compareAndSet = this.superCheck.compareAndSet(true, false);
if (compareAndSet) {
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
}
// these are concurrent collections
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
synchronized(listenerClass) {
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
}
@ -354,7 +340,6 @@ public class SubscriptionManager {
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
this.superCheck.set(true);
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared.
@ -388,7 +373,6 @@ public class SubscriptionManager {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
this.superCheck.set(true);
Map<Class<?>, Collection<Subscription>> local = this.varArgSuperClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
@ -432,7 +416,6 @@ public class SubscriptionManager {
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
this.superCheck.set(true);
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
@ -474,7 +457,6 @@ public class SubscriptionManager {
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
this.superCheck.set(true);
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
@ -548,7 +530,6 @@ public class SubscriptionManager {
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
this.superCheck.set(true);
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.