diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 585dcae..b54b195 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -1,7 +1,6 @@ package net.engio.mbassy.multi.common; -import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap; - import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; /** * This implementation uses strong references to the elements. @@ -18,7 +17,8 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { } public StrongConcurrentSet(int size, float loadFactor) { - super(new Reference2ReferenceOpenHashMap>(size, loadFactor)); + super(new ConcurrentHashMap>(size, loadFactor, 1)); +// super(new Reference2ReferenceOpenHashMap>(size, loadFactor)); } @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 26d4aa2..797f363 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -10,7 +10,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; @@ -31,8 +30,6 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException; * Date: 2/2/15 */ public class SubscriptionManager { - private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4; - private final int MAP_STRIPING; private float LOAD_FACTOR; @@ -58,36 +55,34 @@ public class SubscriptionManager { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private volatile Map, FastEntrySet> superClassSubscriptions; + private Map, FastEntrySet> superClassSubscriptions; // private final IdentityObjectTree, Collection> superClassSubscriptionsMulti = new IdentityObjectTree, Collection>(); // remember already processed classes that do not contain any message handlers private final Map, Object> nonListeners; - // synchronize read/write access to the subscription maps - private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); - public SubscriptionManager(int numberOfThreads) { - this.MAP_STRIPING = numberOfThreads; + this.MAP_STRIPING = numberOfThreads/2; this.LOAD_FACTOR = 0.8f; - this.subscriptionsPerMessageSingle = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, 1); + this.subscriptionsPerMessageSingle = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.superClassesCache = new ConcurrentHashMap, FastEntrySet>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance on handlers + this.superClassSubscriptions = new ConcurrentHashMap, FastEntrySet>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.nonListeners = new ConcurrentHashMap, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); } private final void resetSuperClassSubs() { - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance on handlers - this.superClassSubscriptions = new ConcurrentHashMap, FastEntrySet>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassSubscriptions.clear(); } public void unsubscribe(Object listener) { @@ -161,9 +156,6 @@ public class SubscriptionManager { private final ThreadLocal> subInitialValue = new ThreadLocal>() { @Override protected java.util.Collection initialValue() { -// return new ArrayDeque(8); -// return Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, SubscriptionManager.this.LOAD_FACTOR)); -// return Collections.newSetFromMap(new ConcurrentHashMap(8, SubscriptionManager.this.LOAD_FACTOR, SubscriptionManager.this.MAP_STRIPING)); return new StrongConcurrentSet(8, SubscriptionManager.this.LOAD_FACTOR); } }; @@ -547,10 +539,6 @@ public class SubscriptionManager { if (!this.superClassesCache.containsKey(clazz)) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); -// types = new ArrayDeque>(superTypes); -// types = new StrongConcurrentSet>(superTypes.size(), this.LOAD_FACTOR); -// types.addAll(superTypes); - Reference2BooleanArrayMap> map = new Reference2BooleanArrayMap>(superTypes.size() + 1); for (Class c : superTypes) { map.put(c, Boolean.TRUE);