WIP - playing with concurrent maps + ability to have non-blocking for updates
This commit is contained in:
parent
8b69d53263
commit
120054afd4
@ -8,8 +8,8 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
import net.engio.mbassy.multi.common.DeadMessage;
|
import net.engio.mbassy.multi.common.DeadMessage;
|
||||||
import net.engio.mbassy.multi.common.NamedThreadFactory;
|
|
||||||
import net.engio.mbassy.multi.common.LinkedTransferQueue;
|
import net.engio.mbassy.multi.common.LinkedTransferQueue;
|
||||||
|
import net.engio.mbassy.multi.common.NamedThreadFactory;
|
||||||
import net.engio.mbassy.multi.common.TransferQueue;
|
import net.engio.mbassy.multi.common.TransferQueue;
|
||||||
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
|
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
|
||||||
import net.engio.mbassy.multi.error.PublicationError;
|
import net.engio.mbassy.multi.error.PublicationError;
|
||||||
@ -29,7 +29,7 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
// this handler will receive all errors that occur during message dispatch or message handling
|
// this handler will receive all errors that occur during message dispatch or message handling
|
||||||
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
|
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
|
||||||
|
|
||||||
private final SubscriptionManager subscriptionManager = new SubscriptionManager();
|
private final SubscriptionManager subscriptionManager;
|
||||||
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
|
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
|
||||||
|
|
||||||
|
|
||||||
@ -38,6 +38,7 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
|
|
||||||
public MultiMBassador() {
|
public MultiMBassador() {
|
||||||
this(Runtime.getRuntime().availableProcessors());
|
this(Runtime.getRuntime().availableProcessors());
|
||||||
|
// this(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -46,6 +47,7 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
numberOfThreads = 1; // at LEAST 1 thread
|
numberOfThreads = 1; // at LEAST 1 thread
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
|
||||||
this.threads = new ArrayList<Thread>(numberOfThreads);
|
this.threads = new ArrayList<Thread>(numberOfThreads);
|
||||||
|
|
||||||
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
|
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
|
||||||
@ -124,45 +126,42 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("null")
|
|
||||||
@Override
|
@Override
|
||||||
public void publish(Object message) {
|
public void publish(Object message) {
|
||||||
SubscriptionManager manager = this.subscriptionManager;
|
SubscriptionManager manager = this.subscriptionManager;
|
||||||
|
|
||||||
Class<?> messageClass = message.getClass();
|
Class<?> messageClass = message.getClass();
|
||||||
manager.readLock();
|
// manager.readLock();
|
||||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||||
boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
|
|
||||||
|
|
||||||
Collection<Subscription> deadSubscriptions = null;
|
|
||||||
if (!validSubs) {
|
|
||||||
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
|
|
||||||
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
||||||
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
|
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
|
||||||
manager.readUnLock();
|
// manager.readUnLock();
|
||||||
|
|
||||||
|
|
||||||
// Run subscriptions
|
// Run subscriptions
|
||||||
if (validSubs) {
|
if (subscriptions != null && !subscriptions.isEmpty()) {
|
||||||
for (Subscription sub : subscriptions) {
|
for (Subscription sub : subscriptions) {
|
||||||
// this catches all exception types
|
// this catches all exception types
|
||||||
sub.publishToSubscription(this, message);
|
sub.publishToSubscription(this, message);
|
||||||
}
|
}
|
||||||
} else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
} else {
|
||||||
DeadMessage deadMessage = new DeadMessage(message);
|
// manager.readLock();
|
||||||
|
// Dead Event must EXACTLY MATCH (no subclasses or varargs permitted)
|
||||||
|
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||||
|
// manager.readUnLock();
|
||||||
|
|
||||||
|
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||||
|
DeadMessage deadMessage = new DeadMessage(message);
|
||||||
|
for (Subscription sub : deadSubscriptions) {
|
||||||
|
// this catches all exception types
|
||||||
|
sub.publishToSubscription(this, deadMessage);
|
||||||
|
}
|
||||||
|
|
||||||
for (Subscription sub : deadSubscriptions) {
|
|
||||||
// this catches all exception types
|
|
||||||
sub.publishToSubscription(this, deadMessage);
|
|
||||||
}
|
}
|
||||||
// Dead Event. only matches EXACT handlers (no vararg, no subclasses)
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// now get superClasses
|
// now get superClasses
|
||||||
if (superSubscriptions != null) {
|
if (superSubscriptions != null) {
|
||||||
for (Subscription sub : superSubscriptions) {
|
for (Subscription sub : superSubscriptions) {
|
||||||
@ -171,6 +170,8 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// now get varargs
|
// now get varargs
|
||||||
if (varArgs != null && !varArgs.isEmpty()) {
|
if (varArgs != null && !varArgs.isEmpty()) {
|
||||||
// messy, but the ONLY way to do it.
|
// messy, but the ONLY way to do it.
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -1,7 +1,7 @@
|
|||||||
package net.engio.mbassy.multi.common;
|
package net.engio.mbassy.multi.common;
|
||||||
|
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -228,7 +228,7 @@ public class IdentityObjectTree<KEY, VALUE> {
|
|||||||
IdentityObjectTree<KEY, VALUE> objectTree;
|
IdentityObjectTree<KEY, VALUE> objectTree;
|
||||||
|
|
||||||
if (this.children == null) {
|
if (this.children == null) {
|
||||||
this.children = new IdentityHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2);
|
this.children = new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(4, .9f, 1);
|
||||||
|
|
||||||
// might as well add too
|
// might as well add too
|
||||||
objectTree = new IdentityObjectTree<KEY, VALUE>();
|
objectTree = new IdentityObjectTree<KEY, VALUE>();
|
||||||
|
@ -13,10 +13,10 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
|
|||||||
|
|
||||||
|
|
||||||
public StrongConcurrentSet() {
|
public StrongConcurrentSet() {
|
||||||
this(16);
|
this(16, .75f);
|
||||||
}
|
}
|
||||||
|
|
||||||
public StrongConcurrentSet(int size) {
|
public StrongConcurrentSet(int size, float lOAD_FACTOR) {
|
||||||
super(new IdentityHashMap<T, ISetEntry<T>>(size));
|
super(new IdentityHashMap<T, ISetEntry<T>>(size));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,14 +0,0 @@
|
|||||||
package net.engio.mbassy.multi.common;
|
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import net.engio.mbassy.multi.subscription.Subscription;
|
|
||||||
import dorkbox.util.objectPool.PoolableObject;
|
|
||||||
|
|
||||||
public class SubscriptionPoolable implements PoolableObject<Collection<Subscription>> {
|
|
||||||
@Override
|
|
||||||
public Collection<Subscription> create() {
|
|
||||||
return new ArrayDeque<Subscription>(64);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,8 +1,7 @@
|
|||||||
package net.engio.mbassy.multi.listener;
|
package net.engio.mbassy.multi.listener;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus,
|
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus,
|
||||||
@ -21,7 +20,7 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public class MessageListener {
|
public class MessageListener {
|
||||||
|
|
||||||
private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
|
private Collection<MessageHandler> handlers = new ArrayDeque<MessageHandler>();
|
||||||
private Class<?> listenerDefinition;
|
private Class<?> listenerDefinition;
|
||||||
|
|
||||||
public MessageListener(Class<?> listenerDefinition) {
|
public MessageListener(Class<?> listenerDefinition) {
|
||||||
@ -41,7 +40,7 @@ public class MessageListener {
|
|||||||
return this.handlers.add(messageHandler);
|
return this.handlers.add(messageHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MessageHandler> getHandlers() {
|
public Collection<MessageHandler> getHandlers() {
|
||||||
return this.handlers;
|
return this.handlers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,13 +3,11 @@ package net.engio.mbassy.multi.subscription;
|
|||||||
import java.lang.reflect.Array;
|
import java.lang.reflect.Array;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
||||||
import net.engio.mbassy.multi.common.ReflectionUtils;
|
import net.engio.mbassy.multi.common.ReflectionUtils;
|
||||||
@ -34,6 +32,8 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
|
|||||||
public class SubscriptionManager {
|
public class SubscriptionManager {
|
||||||
private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4;
|
private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4;
|
||||||
|
|
||||||
|
private final int MAP_STRIPING;
|
||||||
|
private float LOAD_FACTOR;
|
||||||
|
|
||||||
// the metadata reader that is used to inspect objects passed to the subscribe method
|
// the metadata reader that is used to inspect objects passed to the subscribe method
|
||||||
private final MetadataReader metadataReader = new MetadataReader();
|
private final MetadataReader metadataReader = new MetadataReader();
|
||||||
@ -41,32 +41,58 @@ public class SubscriptionManager {
|
|||||||
// all subscriptions per message type
|
// all subscriptions per message type
|
||||||
// this is the primary list for dispatching a specific message
|
// this is the primary list for dispatching a specific message
|
||||||
// write access is synchronized and happens only when a listener of a specific class is registered the first time
|
// write access is synchronized and happens only when a listener of a specific class is registered the first time
|
||||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
|
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
|
||||||
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
|
||||||
|
|
||||||
// all subscriptions per messageHandler type
|
// all subscriptions per messageHandler type
|
||||||
// this map provides fast access for subscribing and unsubscribing
|
// this map provides fast access for subscribing and unsubscribing
|
||||||
// write access is synchronized and happens very infrequently
|
// write access is synchronized and happens very infrequently
|
||||||
// once a collection of subscriptions is stored it does not change
|
// once a collection of subscriptions is stored it does not change
|
||||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
|
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener;
|
||||||
|
|
||||||
private final Object holder = new Object[0];
|
private final Object holder = new Object[0];
|
||||||
|
|
||||||
// remember classes that can have VarArg casting performed
|
// remember classes that can have VarArg casting performed
|
||||||
private final ConcurrentHashMap<Class<?>, Class<?>> varArgClasses = new ConcurrentHashMap<Class<?>, Class<?>>();
|
private final Map<Class<?>, Class<?>> varArgClasses;
|
||||||
|
|
||||||
private final Map<Class<?>, ArrayDeque<Class<?>>> superClassesCache = new IdentityHashMap<Class<?>, ArrayDeque<Class<?>>>();
|
private final Map<Class<?>, Collection<Class<?>>> superClassesCache;
|
||||||
// private final Map<Class<?>, Collection<Subscription>> superClassSubscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
|
|
||||||
|
|
||||||
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
|
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
||||||
|
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
|
||||||
|
private volatile Map<Class<?>, Collection<Subscription>> superClassSubscriptions;
|
||||||
|
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
|
||||||
|
|
||||||
// remember already processed classes that do not contain any message handlers
|
// remember already processed classes that do not contain any message handlers
|
||||||
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
|
private final Map<Class<?>, Object> nonListeners;
|
||||||
|
|
||||||
// synchronize read/write acces to the subscription maps
|
// synchronize read/write acces to the subscription maps
|
||||||
private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
|
private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
|
||||||
|
|
||||||
public SubscriptionManager() {
|
public SubscriptionManager(int numberOfThreads) {
|
||||||
|
this.MAP_STRIPING = 1;
|
||||||
|
this.LOAD_FACTOR = 0.8f;
|
||||||
|
|
||||||
|
// this.subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(4);
|
||||||
|
// this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
//
|
||||||
|
// // only used during SUB/UNSUB
|
||||||
|
// this.subscriptionsPerListener = new IdentityHashMap<Class<?>, Collection<Subscription>>(4);
|
||||||
|
//
|
||||||
|
// this.varArgClasses = new IdentityHashMap<Class<?>, Class<?>>(8);
|
||||||
|
// this.superClassesCache = new IdentityHashMap<Class<?>, Collection<Class<?>>>(8);
|
||||||
|
|
||||||
|
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
|
||||||
|
// only used during SUB/UNSUB
|
||||||
|
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
|
||||||
|
|
||||||
|
this.varArgClasses = new ConcurrentHashMap<Class<?>, Class<?>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
this.superClassesCache = new ConcurrentHashMap<Class<?>, Collection<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
|
||||||
|
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribe(Object listener) {
|
public void unsubscribe(Object listener) {
|
||||||
@ -77,8 +103,9 @@ public class SubscriptionManager {
|
|||||||
Class<?> listenerClass = listener.getClass();
|
Class<?> listenerClass = listener.getClass();
|
||||||
Collection<Subscription> subscriptions;
|
Collection<Subscription> subscriptions;
|
||||||
boolean nothingLeft = true;
|
boolean nothingLeft = true;
|
||||||
|
Lock UPDATE = this.LOCK.updateLock();
|
||||||
try {
|
try {
|
||||||
this.LOCK.updateLock().lock();
|
UPDATE.lock();
|
||||||
|
|
||||||
subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
||||||
|
|
||||||
@ -87,6 +114,7 @@ public class SubscriptionManager {
|
|||||||
subscription.unsubscribe(listener);
|
subscription.unsubscribe(listener);
|
||||||
|
|
||||||
boolean isEmpty = subscription.isEmpty();
|
boolean isEmpty = subscription.isEmpty();
|
||||||
|
|
||||||
if (isEmpty) {
|
if (isEmpty) {
|
||||||
// single or multi?
|
// single or multi?
|
||||||
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
|
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
|
||||||
@ -103,17 +131,12 @@ public class SubscriptionManager {
|
|||||||
if (subs.isEmpty()) {
|
if (subs.isEmpty()) {
|
||||||
// remove element
|
// remove element
|
||||||
this.subscriptionsPerMessageSingle.remove(clazz);
|
this.subscriptionsPerMessageSingle.remove(clazz);
|
||||||
|
|
||||||
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
|
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||||
|
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Collection<Subscription> superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz);
|
|
||||||
// if (superSubs != null) {
|
|
||||||
// superSubs.remove(subscription);
|
|
||||||
//
|
|
||||||
// if (superSubs.isEmpty()) {
|
|
||||||
// // remove element
|
|
||||||
// this.superClassSubscriptionsPerMessageSingle.remove(clazz);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||||
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
|
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
|
||||||
@ -130,14 +153,12 @@ public class SubscriptionManager {
|
|||||||
subs.remove(subscription);
|
subs.remove(subscription);
|
||||||
|
|
||||||
if (subs.isEmpty()) {
|
if (subs.isEmpty()) {
|
||||||
this.LOCK.writeLock().lock();
|
|
||||||
// remove tree element
|
// remove tree element
|
||||||
switch (size) {
|
switch (size) {
|
||||||
case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
|
case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
|
||||||
case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
|
case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
|
||||||
default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
|
default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
|
||||||
}
|
}
|
||||||
this.LOCK.writeLock().unlock();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -149,13 +170,14 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (nothingLeft) {
|
if (nothingLeft) {
|
||||||
this.LOCK.writeLock().lock();
|
Lock WRITE = this.LOCK.writeLock();
|
||||||
|
WRITE.lock();
|
||||||
this.subscriptionsPerListener.remove(listenerClass);
|
this.subscriptionsPerListener.remove(listenerClass);
|
||||||
this.LOCK.writeLock().unlock();
|
WRITE.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
this.LOCK.updateLock().unlock();
|
UPDATE.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@ -172,32 +194,36 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Collection<Subscription> subscriptions;
|
Collection<Subscription> subscriptions;
|
||||||
|
Lock UPDATE = this.LOCK.updateLock();
|
||||||
try {
|
try {
|
||||||
this.LOCK.updateLock().lock();
|
UPDATE.lock();
|
||||||
subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
||||||
|
|
||||||
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
|
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||||
|
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
|
||||||
if (subscriptions != null) {
|
if (subscriptions != null) {
|
||||||
// subscriptions already exist and must only be updated
|
// subscriptions already exist and must only be updated
|
||||||
for (Subscription subscription : subscriptions) {
|
for (Subscription subscription : subscriptions) {
|
||||||
subscription.subscribe(listener);
|
subscription.subscribe(listener);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// a listener is subscribed for the first time
|
Lock WRITE = this.LOCK.writeLock();
|
||||||
try {
|
try {
|
||||||
this.LOCK.writeLock().lock(); // upgrade updatelock to write lock, Avoid DCL
|
WRITE.lock(); // upgrade updatelock to write lock, Avoid DCL
|
||||||
|
|
||||||
// a listener is subscribed for the first time
|
// a listener is subscribed for the first time
|
||||||
List<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
|
Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
|
||||||
if (messageHandlers.isEmpty()) {
|
if (messageHandlers.isEmpty()) {
|
||||||
// remember the class as non listening class if no handlers are found
|
// remember the class as non listening class if no handlers are found
|
||||||
this.nonListeners.put(listenerClass, this.holder);
|
this.nonListeners.put(listenerClass, this.holder);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock
|
subscriptions = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR);
|
||||||
subscriptions = new StrongConcurrentSet<Subscription>(messageHandlers.size());
|
|
||||||
|
|
||||||
// create subscriptions for all detected message handlers
|
// create NEW subscriptions for all detected message handlers
|
||||||
for (MessageHandler messageHandler : messageHandlers) {
|
for (MessageHandler messageHandler : messageHandlers) {
|
||||||
// create the subscription
|
// create the subscription
|
||||||
Subscription subscription = new Subscription(messageHandler);
|
Subscription subscription = new Subscription(messageHandler);
|
||||||
@ -206,40 +232,32 @@ public class SubscriptionManager {
|
|||||||
// single or multi?
|
// single or multi?
|
||||||
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
|
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
|
||||||
int size = handledMessageTypes.length;
|
int size = handledMessageTypes.length;
|
||||||
|
boolean acceptsSubtypes = subscription.acceptsSubtypes();
|
||||||
|
|
||||||
if (size == 1) {
|
if (size == 1) {
|
||||||
// single
|
// single
|
||||||
Class<?> clazz = handledMessageTypes[0];
|
Class<?> clazz = handledMessageTypes[0];
|
||||||
|
|
||||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
|
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
|
||||||
// Collection<Subscription> superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz);
|
|
||||||
if (subs == null) {
|
if (subs == null) {
|
||||||
// NOTE: Order is important for safe publication
|
// NOTE: Order is important for safe publication
|
||||||
subs = new StrongConcurrentSet<Subscription>(2);
|
subs = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR);
|
||||||
subs.add(subscription);
|
subs.add(subscription);
|
||||||
this.subscriptionsPerMessageSingle.put(clazz, subs);
|
this.subscriptionsPerMessageSingle.put(clazz, subs);
|
||||||
|
|
||||||
// if (subscription.acceptsSubtypes()) {
|
|
||||||
// superSubs = new StrongConcurrentSet<Subscription>(2);
|
|
||||||
// superSubs.add(subscription);
|
|
||||||
// this.superClassSubscriptionsPerMessageSingle.put(clazz, superSubs);
|
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
subs.add(subscription);
|
subs.add(subscription);
|
||||||
|
|
||||||
// if (subscription.acceptsSubtypes()) {
|
|
||||||
// superSubs.add(subscription);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
|
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
|
||||||
if (subscription.isVarArg()) {
|
if (subscription.isVarArg()) {
|
||||||
Class<?> componentType = clazz.getComponentType();
|
Class<?> componentType = clazz.getComponentType();
|
||||||
this.varArgClasses.putIfAbsent(componentType, clazz);
|
this.varArgClasses.put(componentType, clazz);
|
||||||
|
|
||||||
// since it's vararg, this means that it's an ARRAY, so we ALSO
|
// since it's vararg, this means that it's an ARRAY, so we ALSO
|
||||||
// have to add the component classes of the array
|
// have to add the component classes of the array
|
||||||
if (subscription.acceptsSubtypes()) {
|
if (acceptsSubtypes) {
|
||||||
ArrayDeque<Class<?>> superClasses = setupSuperClassCache(componentType);
|
Collection<Class<?>> superClasses = setupSuperClassCache(componentType);
|
||||||
|
|
||||||
// have to setup each vararg chain
|
// have to setup each vararg chain
|
||||||
for (Class<?> superClass : superClasses) {
|
for (Class<?> superClass : superClasses) {
|
||||||
@ -250,7 +268,7 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (subscription.acceptsSubtypes()) {
|
} else if (acceptsSubtypes) {
|
||||||
setupSuperClassCache(clazz);
|
setupSuperClassCache(clazz);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -261,7 +279,7 @@ public class SubscriptionManager {
|
|||||||
switch (size) {
|
switch (size) {
|
||||||
case 2: {
|
case 2: {
|
||||||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
|
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
|
||||||
if (subscription.acceptsSubtypes()) {
|
if (acceptsSubtypes) {
|
||||||
setupSuperClassCache(handledMessageTypes[0]);
|
setupSuperClassCache(handledMessageTypes[0]);
|
||||||
setupSuperClassCache(handledMessageTypes[1]);
|
setupSuperClassCache(handledMessageTypes[1]);
|
||||||
}
|
}
|
||||||
@ -269,7 +287,7 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
case 3: {
|
case 3: {
|
||||||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
|
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
|
||||||
if (subscription.acceptsSubtypes()) {
|
if (acceptsSubtypes) {
|
||||||
setupSuperClassCache(handledMessageTypes[0]);
|
setupSuperClassCache(handledMessageTypes[0]);
|
||||||
setupSuperClassCache(handledMessageTypes[1]);
|
setupSuperClassCache(handledMessageTypes[1]);
|
||||||
setupSuperClassCache(handledMessageTypes[2]);
|
setupSuperClassCache(handledMessageTypes[2]);
|
||||||
@ -278,7 +296,7 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
|
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
|
||||||
if (subscription.acceptsSubtypes()) {
|
if (acceptsSubtypes) {
|
||||||
for (Class<?> c : handledMessageTypes) {
|
for (Class<?> c : handledMessageTypes) {
|
||||||
setupSuperClassCache(c);
|
setupSuperClassCache(c);
|
||||||
}
|
}
|
||||||
@ -289,7 +307,7 @@ public class SubscriptionManager {
|
|||||||
|
|
||||||
Collection<Subscription> subs = tree.getValue();
|
Collection<Subscription> subs = tree.getValue();
|
||||||
if (subs == null) {
|
if (subs == null) {
|
||||||
subs = new LinkedList<Subscription>();
|
subs = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR);
|
||||||
tree.putValue(subs);
|
tree.putValue(subs);
|
||||||
}
|
}
|
||||||
subs.add(subscription);
|
subs.add(subscription);
|
||||||
@ -300,11 +318,11 @@ public class SubscriptionManager {
|
|||||||
|
|
||||||
this.subscriptionsPerListener.put(listenerClass, subscriptions);
|
this.subscriptionsPerListener.put(listenerClass, subscriptions);
|
||||||
} finally {
|
} finally {
|
||||||
this.LOCK.writeLock().unlock();
|
WRITE.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.LOCK.updateLock().unlock();
|
UPDATE.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,22 +355,34 @@ public class SubscriptionManager {
|
|||||||
// must be protected by read lock
|
// must be protected by read lock
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
||||||
Collection<Class<?>> types = this.superClassesCache.get(superType);
|
Map<Class<?>, Collection<Subscription>> superClassSubs = this.superClassSubscriptions;
|
||||||
if (types == null || types.isEmpty()) {
|
if (superClassSubs == null) {
|
||||||
|
// we haven't created it yet
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
|
Collection<Subscription> subsPerType = superClassSubs.get(superType);
|
||||||
|
|
||||||
for (Class<?> superClass : types) {
|
if (subsPerType == null) {
|
||||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
|
Collection<Class<?>> types = this.superClassesCache.get(superType);
|
||||||
if (subs != null) {
|
if (types == null || types.isEmpty()) {
|
||||||
for (Subscription sub : subs) {
|
return null;
|
||||||
if (sub.acceptsSubtypes()) {
|
}
|
||||||
subsPerType.add(sub);
|
|
||||||
|
subsPerType = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR);
|
||||||
|
|
||||||
|
for (Class<?> superClass : types) {
|
||||||
|
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
|
||||||
|
if (subs != null) {
|
||||||
|
for (Subscription sub : subs) {
|
||||||
|
if (sub.acceptsSubtypes()) {
|
||||||
|
subsPerType.add(sub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
superClassSubs.put(superType, subsPerType);
|
||||||
}
|
}
|
||||||
|
|
||||||
return subsPerType;
|
return subsPerType;
|
||||||
@ -361,6 +391,9 @@ public class SubscriptionManager {
|
|||||||
// must be protected by read lock
|
// must be protected by read lock
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
|
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
|
||||||
|
// Collection<Subscription> subsPerType2 = this.superClassSubscriptions.get();
|
||||||
|
|
||||||
|
|
||||||
// not thread safe. DO NOT MODIFY
|
// not thread safe. DO NOT MODIFY
|
||||||
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
|
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
|
||||||
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
|
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
|
||||||
@ -475,42 +508,21 @@ public class SubscriptionManager {
|
|||||||
return subsPerType;
|
return subsPerType;
|
||||||
}
|
}
|
||||||
|
|
||||||
// not a thread safe collection, but it doesn't matter
|
private Collection<Class<?>> setupSuperClassCache(Class<?> clazz) {
|
||||||
private ArrayDeque<Class<?>> setupSuperClassCache(Class<?> clazz) {
|
Collection<Class<?>> types = this.superClassesCache.get(clazz);
|
||||||
ArrayDeque<Class<?>> types = this.superClassesCache.get(clazz);
|
|
||||||
|
|
||||||
if (types == null) {
|
if (types == null) {
|
||||||
// it doesn't matter if concurrent access stomps on values, since they are always the same.
|
// it doesn't matter if concurrent access stomps on values, since they are always the same.
|
||||||
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
|
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
|
||||||
types = new ArrayDeque<Class<?>>(superTypes);
|
types = new ArrayDeque<Class<?>>(superTypes);
|
||||||
// NOTE: no need to write lock, since race conditions will result in duplicate answers (which we don't care about)
|
|
||||||
|
// race conditions will result in duplicate answers, which we don't care about
|
||||||
this.superClassesCache.put(clazz, types);
|
this.superClassesCache.put(clazz, types);
|
||||||
}
|
}
|
||||||
|
|
||||||
return types;
|
return types;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
///////////////
|
|
||||||
// a var-arg handler might match
|
|
||||||
///////////////
|
|
||||||
private void addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) {
|
|
||||||
// tricky part. We have to check the ARRAY version
|
|
||||||
Collection<Subscription> subs;
|
|
||||||
|
|
||||||
Class<?> varArgClass = this.varArgClasses.get(messageType);
|
|
||||||
if (varArgClass != null) {
|
|
||||||
// also add all subscriptions that match super types
|
|
||||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
|
||||||
if (subs != null) {
|
|
||||||
for (Subscription sub : subs) {
|
|
||||||
subscriptions.add(sub);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// must be protected by read lock
|
// must be protected by read lock
|
||||||
public Collection<Subscription> getVarArgs(Class<?> clazz) {
|
public Collection<Subscription> getVarArgs(Class<?> clazz) {
|
||||||
Class<?> varArgClass = this.varArgClasses.get(clazz);
|
Class<?> varArgClass = this.varArgClasses.get(clazz);
|
||||||
@ -554,81 +566,6 @@ public class SubscriptionManager {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
///////////////
|
|
||||||
// a var-arg handler might match
|
|
||||||
// tricky part. We have to check the ARRAY version
|
|
||||||
///////////////
|
|
||||||
private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayDeque<Class<?>> types1) {
|
|
||||||
Collection<Subscription> subs;
|
|
||||||
|
|
||||||
Class<?> varArgClass = this.varArgClasses.get(messageType);
|
|
||||||
if (varArgClass != null) {
|
|
||||||
// also add all subscriptions that match super types
|
|
||||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
|
||||||
if (subs != null) {
|
|
||||||
for (Subscription sub : subs) {
|
|
||||||
subscriptions.add(sub);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Class<?> eventSuperType : types1) {
|
|
||||||
varArgClass = this.varArgClasses.get(eventSuperType);
|
|
||||||
if (varArgClass != null) {
|
|
||||||
// also add all subscriptions that match super types
|
|
||||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
|
||||||
if (subs != null) {
|
|
||||||
for (Subscription sub : subs) {
|
|
||||||
subscriptions.add(sub);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index,
|
|
||||||
IdentityObjectTree<Class<?>, Collection<Subscription>> tree, Class<?>[] messageTypes) {
|
|
||||||
|
|
||||||
Class<?> classType = messageTypes[index];
|
|
||||||
// get all the super types, if there are any.
|
|
||||||
ArrayDeque<Class<?>> superClasses = setupSuperClassCache(classType);
|
|
||||||
|
|
||||||
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf;
|
|
||||||
Collection<Subscription> subs;
|
|
||||||
|
|
||||||
Class<?> superClass = classType;
|
|
||||||
int i;
|
|
||||||
int newIndex;
|
|
||||||
|
|
||||||
// for (i = -1; i < superClasses.size(); i++) {
|
|
||||||
// if (i > -1) {
|
|
||||||
// superClass = superClasses.get(i);
|
|
||||||
// }
|
|
||||||
// leaf = tree.getLeaf(superClass);
|
|
||||||
// if (leaf != null) {
|
|
||||||
// newIndex = index+1;
|
|
||||||
// if (index == length) {
|
|
||||||
// subs = leaf.getValue();
|
|
||||||
// if (subs != null) {
|
|
||||||
// for (Subscription sub : subs) {
|
|
||||||
// if (sub.handlesMessageType(messageTypes)) {
|
|
||||||
// subscriptions.add(sub);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
public void readLock() {
|
public void readLock() {
|
||||||
this.LOCK.readLock().lock();
|
this.LOCK.readLock().lock();
|
||||||
}
|
}
|
||||||
|
@ -162,7 +162,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||||||
Overloading.ListenerBase.class,
|
Overloading.ListenerBase.class,
|
||||||
Overloading.ListenerSub.class);
|
Overloading.ListenerSub.class);
|
||||||
|
|
||||||
SubscriptionManager subscriptionManager = new SubscriptionManager();
|
SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits);
|
||||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
||||||
|
|
||||||
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
|
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
|
||||||
@ -181,7 +181,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
|
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
|
||||||
final SubscriptionManager subscriptionManager = new SubscriptionManager();
|
final SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits);
|
||||||
|
|
||||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user