WIP - more even dist of performance (smaller up/down spikes). Using faster collections. SUB/UNSUB is write-locked

This commit is contained in:
nathan 2015-02-20 01:46:03 +01:00
parent 86d04c899c
commit 13cee46f6c
9 changed files with 214 additions and 248 deletions

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -150,11 +149,9 @@ public class MultiMBassador implements IMessageBus {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, deadMessage); sub.publishToSubscription(this, deadMessage);
} }
} }
} }
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass); Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses // now get superClasses
if (superSubscriptions != null) { if (superSubscriptions != null) {
@ -172,7 +169,6 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass1 = message1.getClass(); Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass(); Class<?> messageClass2 = message2.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
@ -183,8 +179,6 @@ public class MultiMBassador implements IMessageBus {
} }
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass1, messageClass2);
manager.readUnLock();
// Run subscriptions // Run subscriptions
@ -212,27 +206,6 @@ public class MultiMBassador implements IMessageBus {
sub.publishToSubscription(this, message1, message2); sub.publishToSubscription(this, message1, message2);
} }
} }
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(messageClass1, 2);
vararg[0] = message1;
vararg[1] = message2;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
} }
@SuppressWarnings("null") @SuppressWarnings("null")
@ -243,7 +216,6 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass1 = message1.getClass(); Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass(); Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass(); Class<?> messageClass3 = message3.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
@ -254,8 +226,6 @@ public class MultiMBassador implements IMessageBus {
} }
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass1, messageClass2, messageClass3);
manager.readUnLock();
// Run subscriptions // Run subscriptions
@ -284,27 +254,6 @@ public class MultiMBassador implements IMessageBus {
} }
} }
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(messageClass1, 3);
vararg[0] = message1;
vararg[0] = message2;
vararg[0] = message3;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
} }
@SuppressWarnings("null") @SuppressWarnings("null")
@ -328,7 +277,6 @@ public class MultiMBassador implements IMessageBus {
} }
} }
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
@ -338,13 +286,6 @@ public class MultiMBassador implements IMessageBus {
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
} }
// we don't support super subscriptions for var-args
// Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClasses);
Collection<Subscription> varArgs = null;
if (allSameType) {
varArgs = manager.getVarArgs(messageClasses);
}
manager.readUnLock();
// Run subscriptions // Run subscriptions
if (validSubs) { if (validSubs) {
@ -372,27 +313,6 @@ public class MultiMBassador implements IMessageBus {
// } // }
// } // }
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(first, size);
for (int i=0;i<size;i++) {
vararg[i] = messages[i];
}
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
} }
@Override @Override
@ -405,17 +325,40 @@ public class MultiMBassador implements IMessageBus {
} }
}; };
try { // try {
this.dispatchQueue.transfer(runnable); // this.dispatchQueue.put(runnable);
} catch (InterruptedException e) { // } catch (InterruptedException e) {
e.printStackTrace(); // e.printStackTrace();
// log.error(e); // // log.error(e);
//
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// }
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e) // int counter = 200;
.setPublishedObject(message)); // while (!this.dispatchQueue.offer(runnable)) {
} // if (counter > 0) {
// --counter;
// LockSupport.parkNanos(1L);
// } else {
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
// break;
// }
// }
} }
} }

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.multi.common;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import sun.reflect.generics.reflectiveObjects.NotImplementedException; import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@ -17,12 +18,12 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
* @author bennidi * @author bennidi
* Date: 2/12/12 * Date: 2/12/12
*/ */
public abstract class AbstractConcurrentSet<T> implements Collection<T> { public abstract class AbstractConcurrentSet<T> implements Set<T> {
// Internal state // Internal state
protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup private final transient Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected Entry<T> head; // reference to the first element protected transient Entry<T> head; // reference to the first element
protected AbstractConcurrentSet(Map<T, ISetEntry<T>> entries) { protected AbstractConcurrentSet(Map<T, ISetEntry<T>> entries) {
this.entries = entries; this.entries = entries;

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -21,6 +22,12 @@ public class IdentityObjectTree<KEY, VALUE> {
public IdentityObjectTree() { public IdentityObjectTree() {
} }
// can be overridded to provide a custom backing map
protected Map<KEY, IdentityObjectTree<KEY, VALUE>> createChildren() {
// return new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f, 1);
return new Reference2ReferenceOpenHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f);
}
public VALUE getValue() { public VALUE getValue() {
VALUE returnValue = this.value; VALUE returnValue = this.value;
return returnValue; return returnValue;
@ -219,7 +226,6 @@ public class IdentityObjectTree<KEY, VALUE> {
return leaf; return leaf;
} }
public final IdentityObjectTree<KEY, VALUE> createLeaf(KEY key, VALUE value, boolean setValue) { public final IdentityObjectTree<KEY, VALUE> createLeaf(KEY key, VALUE value, boolean setValue) {
if (key == null) { if (key == null) {
return null; return null;
@ -228,7 +234,7 @@ public class IdentityObjectTree<KEY, VALUE> {
IdentityObjectTree<KEY, VALUE> objectTree; IdentityObjectTree<KEY, VALUE> objectTree;
if (this.children == null) { if (this.children == null) {
this.children = new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, .8f, 1); this.children = createChildren();
// might as well add too // might as well add too
objectTree = new IdentityObjectTree<KEY, VALUE>(); objectTree = new IdentityObjectTree<KEY, VALUE>();

View File

@ -1,11 +1,12 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import it.unimi.dsi.fastutil.objects.Object2BooleanOpenHashMap;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement; import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.Collections;
import java.util.Set; import java.util.Set;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
@ -20,7 +21,10 @@ public class ReflectionUtils
// modified by dorkbox, llc 2015 // modified by dorkbox, llc 2015
public static Collection<Method> getMethods(Class<?> target) { public static Collection<Method> getMethods(Class<?> target) {
Collection<Method> methods = new ArrayDeque<Method>(); return getMethods(target, new StrongConcurrentSet<Method>());
}
public static Collection<Method> getMethods(Class<?> target, Collection<Method> methods) {
try { try {
for (Method method : target.getDeclaredMethods()) { for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) { if (getAnnotation(method, Handler.class) != null) {
@ -31,7 +35,7 @@ public class ReflectionUtils
} }
if (!target.equals(Object.class)) { if (!target.equals(Object.class)) {
methods.addAll(getMethods(target.getSuperclass())); getMethods(target.getSuperclass(), methods);
} }
return methods; return methods;
} }
@ -66,7 +70,8 @@ public class ReflectionUtils
* @return A set of classes, each representing a super type of the root class * @return A set of classes, each representing a super type of the root class
*/ */
public static Set<Class<?>> getSuperTypes(Class<?> from) { public static Set<Class<?>> getSuperTypes(Class<?> from) {
Set<Class<?>> superclasses = new HashSet<Class<?>>(); Set<Class<?>> superclasses = new StrongConcurrentSet<Class<?>>(8, .8f);
collectInterfaces( from, superclasses ); collectInterfaces( from, superclasses );
while ( !from.equals( Object.class ) && !from.isInterface() ) { while ( !from.equals( Object.class ) && !from.isInterface() ) {
@ -104,7 +109,7 @@ public class ReflectionUtils
* @param <A> Class of annotation type * @param <A> Class of annotation type
* @return Annotation instance or null * @return Annotation instance or null
*/ */
private static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType, Set<AnnotatedElement> visited) { private static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType, Collection<AnnotatedElement> visited) {
if( visited.contains(from) ) { if( visited.contains(from) ) {
return null; return null;
} }
@ -123,7 +128,7 @@ public class ReflectionUtils
} }
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){ public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){
return getAnnotation(from, annotationType, new HashSet<AnnotatedElement>()); return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap<AnnotatedElement>(8, .8f)));
} }
// //

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import java.util.IdentityHashMap; import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
import java.util.Iterator; import java.util.Iterator;
/** /**
@ -13,11 +14,12 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public StrongConcurrentSet() { public StrongConcurrentSet() {
this(16, .75f); this(16, 0.75f);
} }
public StrongConcurrentSet(int size, float lOAD_FACTOR) { public StrongConcurrentSet(int size, float loadFactor) {
super(new IdentityHashMap<T, ISetEntry<T>>(size)); // super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1));
super(new Reference2ReferenceOpenHashMap<T, ISetEntry<T>>(size, loadFactor));
} }
@Override @Override

View File

@ -1,33 +1,34 @@
package net.engio.mbassy.multi.listener; package net.engio.mbassy.multi.listener;
import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
/** /**
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, * All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message
* a message listener is any object capable of receiving messages by means of defined message handlers. * listener is any object capable of receiving messages by means of defined message handlers. There are no restrictions about the number of
* There are no restrictions about the number of allowed message handlers in a message listener. * allowed message handlers in a message listener.
* *
* A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler * A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler definition it
* definition it contains. * contains.
* *
* This class is an internal representation of a message listener used to encapsulate all relevant objects * This class is an internal representation of a message listener used to encapsulate all relevant objects and data about that message
* and data about that message listener, especially all its handlers. * listener, especially all its handlers. There will be only one instance of MessageListener per message listener class and message bus
* There will be only one instance of MessageListener per message listener class and message bus instance. * instance.
* *
* @author bennidi * @author bennidi Date: 12/16/12
* Date: 12/16/12
*/ */
public class MessageListener { public class MessageListener {
private Collection<MessageHandler> handlers = new ArrayDeque<MessageHandler>(); private final Collection<MessageHandler> handlers;
private Class<?> listenerDefinition; private Class<?> listenerDefinition;
public MessageListener(Class<?> listenerDefinition) { public MessageListener(Class<?> listenerDefinition, int size) {
this.listenerDefinition = listenerDefinition; this.handlers = new StrongConcurrentSet<MessageHandler>(size, 0.8F);
this.listenerDefinition = listenerDefinition;
} }
public boolean isFromListener(Class<?> listener){ public boolean isFromListener(Class<?> listener) {
return this.listenerDefinition.equals(listener); return this.listenerDefinition.equals(listener);
} }

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.multi.listener; package net.engio.mbassy.multi.listener;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.ReflectionUtils; import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
/** /**
* The meta data reader is responsible for parsing and validating message handler configurations. * The meta data reader is responsible for parsing and validating message handler configurations.
@ -23,19 +23,19 @@ public class MetadataReader {
Collection<Method> allHandlers = ReflectionUtils.getMethods(target); Collection<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
Collection<Method> bottomMostHandlers = new ArrayDeque<Method>(); Collection<Method> bottomMostHandlers = new StrongConcurrentSet<Method>(allHandlers.size(), .8F);
for (Method handler : allHandlers) { for (Method handler : allHandlers) {
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) { if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
bottomMostHandlers.add(handler); bottomMostHandlers.add(handler);
} }
} }
MessageListener listenerMetadata = new MessageListener(target); MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size());
// for each handler there will be no overriding method that specifies @Handler annotation // for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method // but an overriding method does inherit the listener configuration of the overwritten method
for (Method handler : bottomMostHandlers) { for (Method handler : bottomMostHandlers) {
Handler handlerConfig = ReflectionUtils.getAnnotation( handler, Handler.class); Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class);
if (handlerConfig == null || !handlerConfig.enabled()) { if (handlerConfig == null || !handlerConfig.enabled()) {
continue; // disabled or invalid listeners are ignored continue; // disabled or invalid listeners are ignored
} }

View File

@ -37,9 +37,8 @@ public class Subscription {
private final Collection<Object> listeners; private final Collection<Object> listeners;
Subscription(MessageHandler handler) { Subscription(MessageHandler handler) {
// this.listeners = new WeakConcurrentSet<Object>();
this.listeners = new StrongConcurrentSet<Object>();
this.handlerMetadata = handler; this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSet<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) { if (handler.isSynchronized()) {
@ -92,43 +91,43 @@ public class Subscription {
Method handler = this.handlerMetadata.getHandler(); Method handler = this.handlerMetadata.getHandler();
IHandlerInvocation invocation = this.invocation; IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) { try {
try { for (Object listener : listeners) {
invocation.invoke(listener, handler, message); invocation.invoke(listener, handler, message);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getName())
.setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getName())
.setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getName())
.setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getName())
.setListener(listener)
.setPublishedObject(message));
} }
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getName())
// .setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getName())
// .setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getName())
// .setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getName())
// .setListener(listener)
.setPublishedObject(message));
} }
} }
} }

View File

@ -40,7 +40,7 @@ public class SubscriptionManager {
// all subscriptions per message type // all subscriptions per message type
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time // write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle; private final ConcurrentHashMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti; private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
@ -67,18 +67,10 @@ public class SubscriptionManager {
private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
public SubscriptionManager(int numberOfThreads) { public SubscriptionManager(int numberOfThreads) {
this.MAP_STRIPING = 1; this.MAP_STRIPING = numberOfThreads;
this.LOAD_FACTOR = 0.8f; this.LOAD_FACTOR = 0.8f;
// this.subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(4); this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
// this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
//
// // only used during SUB/UNSUB
// this.subscriptionsPerListener = new IdentityHashMap<Class<?>, Collection<Subscription>>(4);
//
// this.superClassesCache = new IdentityHashMap<Class<?>, Collection<Class<?>>>(8);
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>(); this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// only used during SUB/UNSUB // only used during SUB/UNSUB
@ -103,7 +95,7 @@ public class SubscriptionManager {
Class<?> listenerClass = listener.getClass(); Class<?> listenerClass = listener.getClass();
Collection<Subscription> subscriptions; Collection<Subscription> subscriptions;
boolean nothingLeft = true; boolean nothingLeft = true;
Lock UPDATE = this.LOCK.updateLock(); Lock UPDATE = this.LOCK.writeLock();
try { try {
UPDATE.lock(); UPDATE.lock();
@ -168,10 +160,7 @@ public class SubscriptionManager {
} }
if (nothingLeft) { if (nothingLeft) {
Lock WRITE = this.LOCK.writeLock();
WRITE.lock();
this.subscriptionsPerListener.remove(listenerClass); this.subscriptionsPerListener.remove(listenerClass);
WRITE.unlock();
} }
} finally { } finally {
@ -181,6 +170,15 @@ public class SubscriptionManager {
return; return;
} }
private final ThreadLocal<Collection<Subscription>> subInitialValue = new ThreadLocal<Collection<Subscription>>() {
@Override
protected java.util.Collection<Subscription> initialValue() {
// return new ArrayDeque<Subscription>(8);
// return Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR));
// return Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, SubscriptionManager.this.LOAD_FACTOR, SubscriptionManager.this.MAP_STRIPING));
return new StrongConcurrentSet<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR);
}
};
// when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"? // when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"?
public void subscribe(Object listener) { public void subscribe(Object listener) {
@ -192,9 +190,9 @@ public class SubscriptionManager {
} }
Collection<Subscription> subscriptions; Collection<Subscription> subscriptions;
Lock UPDATE = this.LOCK.updateLock(); Lock WRITE = this.LOCK.writeLock();
try { try {
UPDATE.lock(); WRITE.lock();
subscriptions = this.subscriptionsPerListener.get(listenerClass); subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) { if (subscriptions != null) {
@ -203,19 +201,19 @@ public class SubscriptionManager {
subscription.subscribe(listener); subscription.subscribe(listener);
} }
} else { } else {
Lock WRITE = this.LOCK.writeLock(); // a listener is subscribed for the first time
try { Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
WRITE.lock(); // upgrade updatelock to write lock, Avoid DCL int handlersSize = messageHandlers.size();
// a listener is subscribed for the first time if (handlersSize == 0) {
Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); // remember the class as non listening class if no handlers are found
if (messageHandlers.isEmpty()) { this.nonListeners.put(listenerClass, this.holder);
// remember the class as non listening class if no handlers are found } else {
this.nonListeners.put(listenerClass, this.holder); subscriptions = new StrongConcurrentSet<Subscription>(handlersSize, this.LOAD_FACTOR);
return; // subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, this.MAP_STRIPING));
} // subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1));
// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR));
subscriptions = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR); this.subscriptionsPerListener.put(listenerClass, subscriptions);
resetSuperClassSubs(); resetSuperClassSubs();
@ -225,6 +223,11 @@ public class SubscriptionManager {
Subscription subscription = new Subscription(messageHandler); Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener); subscription.subscribe(listener);
subscriptions.add(subscription);
//
// save the subscription per message type
//
// single or multi? // single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes(); Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length; int size = handledMessageTypes.length;
@ -236,70 +239,70 @@ public class SubscriptionManager {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) { if (subs == null) {
// NOTE: Order is important for safe publication Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
subs = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR); if (putIfAbsent != null) {
subs.add(subscription); subs = putIfAbsent;
this.subscriptionsPerMessageSingle.put(clazz, subs); } else {
subs = this.subInitialValue.get();
} else { // this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1)));
subs.add(subscription); // this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR)));
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
// this.subInitialValue.set(new ArrayDeque<Subscription>(8));
}
} }
subs.add(subscription);
if (acceptsSubtypes) { if (acceptsSubtypes) {
// race conditions will result in duplicate answers, which we don't care about
setupSuperClassCache(clazz); setupSuperClassCache(clazz);
} }
} }
else { else {
// NOTE: Not thread-safe! must be synchronized in outer scope // // NOTE: Not thread-safe! must be synchronized in outer scope
IdentityObjectTree<Class<?>, Collection<Subscription>> tree; // IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
switch (size) { // switch (size) {
case 2: { // case 2: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); // tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
if (acceptsSubtypes) { // if (acceptsSubtypes) {
setupSuperClassCache(handledMessageTypes[0]); // setupSuperClassCache(handledMessageTypes[0]);
setupSuperClassCache(handledMessageTypes[1]); // setupSuperClassCache(handledMessageTypes[1]);
} // }
break; // break;
} // }
case 3: { // case 3: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); // tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
if (acceptsSubtypes) { // if (acceptsSubtypes) {
setupSuperClassCache(handledMessageTypes[0]); // setupSuperClassCache(handledMessageTypes[0]);
setupSuperClassCache(handledMessageTypes[1]); // setupSuperClassCache(handledMessageTypes[1]);
setupSuperClassCache(handledMessageTypes[2]); // setupSuperClassCache(handledMessageTypes[2]);
} // }
break; // break;
} // }
default: { // default: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); // tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
if (acceptsSubtypes) { // if (acceptsSubtypes) {
for (Class<?> c : handledMessageTypes) { // for (Class<?> c : handledMessageTypes) {
setupSuperClassCache(c); // setupSuperClassCache(c);
} // }
} // }
break; // break;
} // }
} // }
//
Collection<Subscription> subs = tree.getValue(); // Collection<Subscription> subs = tree.getValue();
if (subs == null) { // if (subs == null) {
subs = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR); // subs = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR);
tree.putValue(subs); // tree.putValue(subs);
} // }
subs.add(subscription); // subs.add(subscription);
} }
subscriptions.add(subscription);
} }
this.subscriptionsPerListener.put(listenerClass, subscriptions);
} finally {
WRITE.unlock();
} }
} }
} finally { } finally {
UPDATE.unlock(); WRITE.unlock();
} }
} }
@ -329,7 +332,6 @@ public class SubscriptionManager {
} }
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) { public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, Collection<Subscription>> superClassSubs = this.superClassSubscriptions; Map<Class<?>, Collection<Subscription>> superClassSubs = this.superClassSubscriptions;
@ -346,7 +348,8 @@ public class SubscriptionManager {
return null; return null;
} }
subsPerType = new StrongConcurrentSet<Subscription>(16, this.LOAD_FACTOR); // subsPerType = new StrongConcurrentSet<Subscription>(types.size(), this.LOAD_FACTOR);
subsPerType = new ArrayDeque<Subscription>(types.size() + 1);
for (Class<?> superClass : types) { for (Class<?> superClass : types) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
@ -485,13 +488,19 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
/**
* race conditions will result in duplicate answers, which we don't care if happens
*/
private Collection<Class<?>> setupSuperClassCache(Class<?> clazz) { private Collection<Class<?>> setupSuperClassCache(Class<?> clazz) {
Collection<Class<?>> types = this.superClassesCache.get(clazz); Collection<Class<?>> types = this.superClassesCache.get(clazz);
if (types == null) { if (types == null) {
// it doesn't matter if concurrent access stomps on values, since they are always the same. // it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
types = new ArrayDeque<Class<?>>(superTypes); // types = new ArrayDeque<Class<?>>(superTypes);
types = new StrongConcurrentSet<Class<?>>(superTypes.size(), this.LOAD_FACTOR);
types.addAll(superTypes);
// race conditions will result in duplicate answers, which we don't care about // race conditions will result in duplicate answers, which we don't care about
this.superClassesCache.put(clazz, types); this.superClassesCache.put(clazz, types);