Tuned concurrency
This commit is contained in:
parent
d0584d52b0
commit
3315f079d5
@ -1,7 +1,6 @@
|
|||||||
package net.engio.mbassy.multi.common;
|
package net.engio.mbassy.multi.common;
|
||||||
import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
|
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This implementation uses strong references to the elements.
|
* This implementation uses strong references to the elements.
|
||||||
@ -18,7 +17,8 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public StrongConcurrentSet(int size, float loadFactor) {
|
public StrongConcurrentSet(int size, float loadFactor) {
|
||||||
super(new Reference2ReferenceOpenHashMap<T, ISetEntry<T>>(size, loadFactor));
|
super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1));
|
||||||
|
// super(new Reference2ReferenceOpenHashMap<T, ISetEntry<T>>(size, loadFactor));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -10,7 +10,6 @@ import java.util.Iterator;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
||||||
import net.engio.mbassy.multi.common.ReflectionUtils;
|
import net.engio.mbassy.multi.common.ReflectionUtils;
|
||||||
@ -31,8 +30,6 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
|
|||||||
* Date: 2/2/15
|
* Date: 2/2/15
|
||||||
*/
|
*/
|
||||||
public class SubscriptionManager {
|
public class SubscriptionManager {
|
||||||
private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4;
|
|
||||||
|
|
||||||
private final int MAP_STRIPING;
|
private final int MAP_STRIPING;
|
||||||
private float LOAD_FACTOR;
|
private float LOAD_FACTOR;
|
||||||
|
|
||||||
@ -58,36 +55,34 @@ public class SubscriptionManager {
|
|||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
||||||
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
|
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
|
||||||
private volatile Map<Class<?>, FastEntrySet<Subscription>> superClassSubscriptions;
|
private Map<Class<?>, FastEntrySet<Subscription>> superClassSubscriptions;
|
||||||
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
|
||||||
|
|
||||||
// remember already processed classes that do not contain any message handlers
|
// remember already processed classes that do not contain any message handlers
|
||||||
private final Map<Class<?>, Object> nonListeners;
|
private final Map<Class<?>, Object> nonListeners;
|
||||||
|
|
||||||
// synchronize read/write access to the subscription maps
|
|
||||||
private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
|
|
||||||
|
|
||||||
|
|
||||||
public SubscriptionManager(int numberOfThreads) {
|
public SubscriptionManager(int numberOfThreads) {
|
||||||
this.MAP_STRIPING = numberOfThreads;
|
this.MAP_STRIPING = numberOfThreads/2;
|
||||||
this.LOAD_FACTOR = 0.8f;
|
this.LOAD_FACTOR = 0.8f;
|
||||||
|
|
||||||
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
|
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
|
||||||
// only used during SUB/UNSUB
|
// only used during SUB/UNSUB
|
||||||
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
|
||||||
this.superClassesCache = new ConcurrentHashMap<Class<?>, FastEntrySet<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
this.superClassesCache = new ConcurrentHashMap<Class<?>, FastEntrySet<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
|
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||||
|
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, FastEntrySet<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
|
||||||
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void resetSuperClassSubs() {
|
private final void resetSuperClassSubs() {
|
||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
this.superClassSubscriptions.clear();
|
||||||
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
|
||||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, FastEntrySet<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribe(Object listener) {
|
public void unsubscribe(Object listener) {
|
||||||
@ -161,9 +156,6 @@ public class SubscriptionManager {
|
|||||||
private final ThreadLocal<Collection<Subscription>> subInitialValue = new ThreadLocal<Collection<Subscription>>() {
|
private final ThreadLocal<Collection<Subscription>> subInitialValue = new ThreadLocal<Collection<Subscription>>() {
|
||||||
@Override
|
@Override
|
||||||
protected java.util.Collection<Subscription> initialValue() {
|
protected java.util.Collection<Subscription> initialValue() {
|
||||||
// return new ArrayDeque<Subscription>(8);
|
|
||||||
// return Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR));
|
|
||||||
// return Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, SubscriptionManager.this.LOAD_FACTOR, SubscriptionManager.this.MAP_STRIPING));
|
|
||||||
return new StrongConcurrentSet<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR);
|
return new StrongConcurrentSet<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -547,10 +539,6 @@ public class SubscriptionManager {
|
|||||||
if (!this.superClassesCache.containsKey(clazz)) {
|
if (!this.superClassesCache.containsKey(clazz)) {
|
||||||
// it doesn't matter if concurrent access stomps on values, since they are always the same.
|
// it doesn't matter if concurrent access stomps on values, since they are always the same.
|
||||||
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
|
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
|
||||||
// types = new ArrayDeque<Class<?>>(superTypes);
|
|
||||||
// types = new StrongConcurrentSet<Class<?>>(superTypes.size(), this.LOAD_FACTOR);
|
|
||||||
// types.addAll(superTypes);
|
|
||||||
|
|
||||||
Reference2BooleanArrayMap<Class<?>> map = new Reference2BooleanArrayMap<Class<?>>(superTypes.size() + 1);
|
Reference2BooleanArrayMap<Class<?>> map = new Reference2BooleanArrayMap<Class<?>>(superTypes.size() + 1);
|
||||||
for (Class<?> c : superTypes) {
|
for (Class<?> c : superTypes) {
|
||||||
map.put(c, Boolean.TRUE);
|
map.put(c, Boolean.TRUE);
|
||||||
|
Loading…
Reference in New Issue
Block a user