Cuncurrent sub/pub. 169ish ns/op VS 256 ns/op

This commit is contained in:
nathan 2015-02-20 19:54:53 +01:00
parent 2be96d7ff6
commit d0584d52b0
4 changed files with 255 additions and 226 deletions

View File

@ -44,7 +44,6 @@ public class MultiMBassador implements IMessageBus {
// this(2);
}
public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 thread
@ -128,13 +127,11 @@ public class MultiMBassador implements IMessageBus {
}
}
@Override
public void publish(Object message) {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
// manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
// Run subscriptions
@ -144,9 +141,8 @@ public class MultiMBassador implements IMessageBus {
sub.publishToSubscription(this, message);
}
} else {
// Dead Event must EXACTLY MATCH (no subclasses or varargs permitted)
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : deadSubscriptions) {
@ -167,7 +163,6 @@ public class MultiMBassador implements IMessageBus {
sub.publishToSubscription(this, message);
}
}
// manager.readUnLock();
}
@SuppressWarnings("null")

View File

@ -18,7 +18,6 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
}
public StrongConcurrentSet(int size, float loadFactor) {
// super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1));
super(new Reference2ReferenceOpenHashMap<T, ISetEntry<T>>(size, loadFactor));
}

View File

@ -91,47 +91,47 @@ public class Subscription {
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
try {
for (Object listener : listeners) {
for (Object listener : listeners) {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (IllegalArgumentException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " + message.getClass()
// + "Expected: " + handler.getParameterTypes()[0])
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (InvocationTargetException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (Throwable e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
}
} catch (IllegalAccessException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (IllegalArgumentException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " + message.getClass()
// + "Expected: " + handler.getParameterTypes()[0])
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (InvocationTargetException e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
} catch (Throwable e) {
e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getName())
//// .setListener(listener)
// .setPublishedObject(message));
}
}
}

View File

@ -10,7 +10,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.multi.common.IdentityObjectTree;
@ -40,7 +39,7 @@ public class SubscriptionManager {
// the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentHashMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
@ -50,7 +49,7 @@ public class SubscriptionManager {
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final ConcurrentHashMap<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final Object holder = new Object[0];
@ -78,7 +77,7 @@ public class SubscriptionManager {
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.superClassesCache = new ConcurrentHashMap<Class<?>, FastEntrySet<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
@ -91,92 +90,69 @@ public class SubscriptionManager {
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, FastEntrySet<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
}
public void readLock() {
this.LOCK.readLock().lock();
}
public void readUnLock() {
this.LOCK.readLock().unlock();
}
public void unsubscribe(Object listener) {
if (listener == null) {
return;
}
Class<?> listenerClass = listener.getClass();
Collection<Subscription> subscriptions;
boolean nothingLeft = true;
Lock UPDATE = this.LOCK.writeLock();
try {
UPDATE.lock();
// these are a concurrent collection
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
boolean isEmpty = subscription.isEmpty();
if (isEmpty) {
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
// NOTE: Order is important for safe publication
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove element
this.subscriptionsPerMessageSingle.remove(clazz);
resetSuperClassSubs();
}
}
} else {
// NOTE: Not thread-safe! must be synchronized in outer scope
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
switch (size) {
case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
}
if (tree != null) {
Collection<Subscription> subs = tree.getValue();
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove tree element
switch (size) {
case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
}
}
}
}
}
}
nothingLeft &= isEmpty;
}
// boolean isEmpty = subscription.isEmpty();
//
// if (isEmpty) {
// // single or multi?
// Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
// int size = handledMessageTypes.length;
// if (size == 1) {
// // single
// Class<?> clazz = handledMessageTypes[0];
//
// // NOTE: Order is important for safe publication
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove element
// this.subscriptionsPerMessageSingle.remove(clazz);
//
// resetSuperClassSubs();
// }
// }
// } else {
// // NOTE: Not thread-safe! must be synchronized in outer scope
// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
// switch (size) {
// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
// }
//
// if (tree != null) {
// Collection<Subscription> subs = tree.getValue();
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove tree element
// switch (size) {
// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
// }
// }
// }
// }
// }
// }
}
if (nothingLeft) {
this.subscriptionsPerListener.remove(listenerClass);
}
} finally {
UPDATE.unlock();
}
return;
@ -201,121 +177,180 @@ public class SubscriptionManager {
return;
}
Collection<Subscription> subscriptions;
Lock WRITE = this.LOCK.writeLock();
try {
WRITE.lock();
subscriptions = this.subscriptionsPerListener.get(listenerClass);
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions == null) {
// a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
if (subscriptions != null) {
// subscriptions already exist and must only be updated
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
}
if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, this.holder);
} else {
// a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, this.holder);
Collection<Subscription> putIfAbsent = this.subscriptionsPerListener.putIfAbsent(listenerClass, this.subInitialValue.get());
if (putIfAbsent != null) {
subscriptions = putIfAbsent;
} else {
subscriptions = new StrongConcurrentSet<Subscription>(handlersSize, this.LOAD_FACTOR);
// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, this.MAP_STRIPING));
// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1));
// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR));
this.subscriptionsPerListener.put(listenerClass, subscriptions);
subscriptions = this.subInitialValue.get();
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
}
resetSuperClassSubs();
resetSuperClassSubs();
// create NEW subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
// create NEW subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
subscriptions.add(subscription);
subscriptions.add(subscription);
//
// save the subscription per message type
//
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
boolean acceptsSubtypes = subscription.acceptsSubtypes();
//
// save the subscription per message type
//
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
boolean acceptsSubtypes = subscription.acceptsSubtypes();
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
if (putIfAbsent != null) {
subs = putIfAbsent;
} else {
subs = this.subInitialValue.get();
// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1)));
// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR)));
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
// this.subInitialValue.set(new ArrayDeque<Subscription>(8));
}
}
subs.add(subscription);
if (acceptsSubtypes) {
// race conditions will result in duplicate answers, which we don't care about
setupSuperClassCache(clazz);
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
if (putIfAbsent != null) {
subs = putIfAbsent;
} else {
subs = this.subInitialValue.get();
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
}
}
else {
// // NOTE: Not thread-safe! must be synchronized in outer scope
// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
// switch (size) {
// case 2: {
// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
// if (acceptsSubtypes) {
// setupSuperClassCache(handledMessageTypes[0]);
// setupSuperClassCache(handledMessageTypes[1]);
// }
// break;
// }
// case 3: {
// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
// if (acceptsSubtypes) {
// setupSuperClassCache(handledMessageTypes[0]);
// setupSuperClassCache(handledMessageTypes[1]);
// setupSuperClassCache(handledMessageTypes[2]);
// }
// break;
// }
// default: {
// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
// if (acceptsSubtypes) {
// for (Class<?> c : handledMessageTypes) {
// setupSuperClassCache(c);
// }
// }
// break;
// }
// }
//
// Collection<Subscription> subs = tree.getValue();
// if (subs == null) {
// subs = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR);
// tree.putValue(subs);
// }
// subs.add(subscription);
subs.add(subscription);
if (acceptsSubtypes) {
// race conditions will result in duplicate answers, which we don't care about
setupSuperClassCache(clazz);
}
}
}
}
} finally {
WRITE.unlock();
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
}
}
//
// if (subscriptions != null) {
// // subscriptions already exist and must only be updated
// for (Subscription subscription : subscriptions) {
// subscription.subscribe(listener);
// }
// }
// else {
// // a listener is subscribed for the first time
// Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
// int handlersSize = messageHandlers.size();
//
// if (handlersSize == 0) {
// // remember the class as non listening class if no handlers are found
// this.nonListeners.put(listenerClass, this.holder);
// } else {
// subscriptions = new StrongConcurrentSet<Subscription>(handlersSize, this.LOAD_FACTOR);
//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, this.MAP_STRIPING));
//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1));
//// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR));
// this.subscriptionsPerListener.put(listenerClass, subscriptions);
//
// resetSuperClassSubs();
//
// // create NEW subscriptions for all detected message handlers
// for (MessageHandler messageHandler : messageHandlers) {
// // create the subscription
// Subscription subscription = new Subscription(messageHandler);
// subscription.subscribe(listener);
//
// subscriptions.add(subscription);
//
// //
// // save the subscription per message type
// //
// // single or multi?
// Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
// int size = handledMessageTypes.length;
// boolean acceptsSubtypes = subscription.acceptsSubtypes();
//
// if (size == 1) {
// // single
// Class<?> clazz = handledMessageTypes[0];
//
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
// if (subs == null) {
// Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
// if (putIfAbsent != null) {
// subs = putIfAbsent;
// } else {
// subs = this.subInitialValue.get();
//// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1)));
//// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR)));
// this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
//// this.subInitialValue.set(new ArrayDeque<Subscription>(8));
// }
// }
//
// subs.add(subscription);
//
// if (acceptsSubtypes) {
// // race conditions will result in duplicate answers, which we don't care about
// setupSuperClassCache(clazz);
// }
// }
// else {
//// // NOTE: Not thread-safe! must be synchronized in outer scope
//// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
////
//// switch (size) {
//// case 2: {
//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
//// if (acceptsSubtypes) {
//// setupSuperClassCache(handledMessageTypes[0]);
//// setupSuperClassCache(handledMessageTypes[1]);
//// }
//// break;
//// }
//// case 3: {
//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
//// if (acceptsSubtypes) {
//// setupSuperClassCache(handledMessageTypes[0]);
//// setupSuperClassCache(handledMessageTypes[1]);
//// setupSuperClassCache(handledMessageTypes[2]);
//// }
//// break;
//// }
//// default: {
//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
//// if (acceptsSubtypes) {
//// for (Class<?> c : handledMessageTypes) {
//// setupSuperClassCache(c);
//// }
//// }
//// break;
//// }
//// }
////
//// Collection<Subscription> subs = tree.getValue();
//// if (subs == null) {
//// subs = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR);
//// tree.putValue(subs);
//// }
//// subs.add(subscription);
// }
// }
// }
// }
}
// must be protected by read lock