From 13cee46f6c51f143e868e8977b8fe5593565d036 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 20 Feb 2015 01:46:03 +0100 Subject: [PATCH] WIP - more even dist of performance (smaller up/down spikes). Using faster collections. SUB/UNSUB is write-locked --- .../engio/mbassy/multi/MultiMBassador.java | 123 ++++-------- .../multi/common/AbstractConcurrentSet.java | 9 +- .../multi/common/IdentityObjectTree.java | 12 +- .../mbassy/multi/common/ReflectionUtils.java | 19 +- .../multi/common/StrongConcurrentSet.java | 10 +- .../multi/listener/MessageListener.java | 31 +-- .../mbassy/multi/listener/MetadataReader.java | 8 +- .../multi/subscription/Subscription.java | 73 ++++---- .../subscription/SubscriptionManager.java | 177 +++++++++--------- 9 files changed, 214 insertions(+), 248 deletions(-) diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 244b15a..9e28e20 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,6 +1,5 @@ package net.engio.mbassy.multi; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -150,11 +149,9 @@ public class MultiMBassador implements IMessageBus { // this catches all exception types sub.publishToSubscription(this, deadMessage); } - } } - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses if (superSubscriptions != null) { @@ -172,7 +169,6 @@ public class MultiMBassador implements IMessageBus { Class messageClass1 = message1.getClass(); Class messageClass2 = message2.getClass(); - manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); @@ -183,8 +179,6 @@ public class MultiMBassador implements IMessageBus { } Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); - Collection varArgs = manager.getVarArgs(messageClass1, messageClass2); - manager.readUnLock(); // Run subscriptions @@ -212,27 +206,6 @@ public class MultiMBassador implements IMessageBus { sub.publishToSubscription(this, message1, message2); } } - - // now get varargs - if (varArgs != null && !varArgs.isEmpty()) { - // messy, but the ONLY way to do it. - Object[] vararg = null; - - for (Subscription sub : varArgs) { - if (sub.isVarArg()) { - if (vararg == null) { - vararg = (Object[]) Array.newInstance(messageClass1, 2); - vararg[0] = message1; - vararg[1] = message2; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - } - sub.publishToSubscription(this, vararg); - } - } - } } @SuppressWarnings("null") @@ -243,7 +216,6 @@ public class MultiMBassador implements IMessageBus { Class messageClass1 = message1.getClass(); Class messageClass2 = message2.getClass(); Class messageClass3 = message3.getClass(); - manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); @@ -254,8 +226,6 @@ public class MultiMBassador implements IMessageBus { } Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); - Collection varArgs = manager.getVarArgs(messageClass1, messageClass2, messageClass3); - manager.readUnLock(); // Run subscriptions @@ -284,27 +254,6 @@ public class MultiMBassador implements IMessageBus { } } - // now get varargs - if (varArgs != null && !varArgs.isEmpty()) { - // messy, but the ONLY way to do it. - Object[] vararg = null; - - for (Subscription sub : varArgs) { - if (sub.isVarArg()) { - if (vararg == null) { - vararg = (Object[]) Array.newInstance(messageClass1, 3); - vararg[0] = message1; - vararg[0] = message2; - vararg[0] = message3; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - } - sub.publishToSubscription(this, vararg); - } - } - } } @SuppressWarnings("null") @@ -328,7 +277,6 @@ public class MultiMBassador implements IMessageBus { } } - manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClasses); boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); @@ -338,13 +286,6 @@ public class MultiMBassador implements IMessageBus { deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } - // we don't support super subscriptions for var-args - // Collection superSubscriptions = manager.getSuperSubscriptions(messageClasses); - Collection varArgs = null; - if (allSameType) { - varArgs = manager.getVarArgs(messageClasses); - } - manager.readUnLock(); // Run subscriptions if (validSubs) { @@ -372,27 +313,6 @@ public class MultiMBassador implements IMessageBus { // } // } - // now get varargs - if (varArgs != null && !varArgs.isEmpty()) { - // messy, but the ONLY way to do it. - Object[] vararg = null; - - for (Subscription sub : varArgs) { - if (sub.isVarArg()) { - if (vararg == null) { - vararg = (Object[]) Array.newInstance(first, size); - for (int i=0;i 0) { +// --counter; +// LockSupport.parkNanos(1L); +// } else { + try { + this.dispatchQueue.transfer(runnable); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } +// break; +// } +// } } } diff --git a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java index 9e9d023..abd98aa 100644 --- a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java @@ -2,6 +2,7 @@ package net.engio.mbassy.multi.common; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import sun.reflect.generics.reflectiveObjects.NotImplementedException; @@ -17,12 +18,12 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * @author bennidi * Date: 2/12/12 */ -public abstract class AbstractConcurrentSet implements Collection { +public abstract class AbstractConcurrentSet implements Set { // Internal state - protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); - private final Map> entries; // maintain a map of entries for O(log n) lookup - protected Entry head; // reference to the first element + protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); + private final transient Map> entries; // maintain a map of entries for O(log n) lookup + protected transient Entry head; // reference to the first element protected AbstractConcurrentSet(Map> entries) { this.entries = entries; diff --git a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java index 225924d..a4e30d0 100644 --- a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java +++ b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java @@ -1,7 +1,8 @@ package net.engio.mbassy.multi.common; +import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap; + import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** @@ -21,6 +22,12 @@ public class IdentityObjectTree { public IdentityObjectTree() { } + // can be overridded to provide a custom backing map + protected Map> createChildren() { +// return new ConcurrentHashMap>(2, 0.75f, 1); + return new Reference2ReferenceOpenHashMap>(2, 0.75f); + } + public VALUE getValue() { VALUE returnValue = this.value; return returnValue; @@ -219,7 +226,6 @@ public class IdentityObjectTree { return leaf; } - public final IdentityObjectTree createLeaf(KEY key, VALUE value, boolean setValue) { if (key == null) { return null; @@ -228,7 +234,7 @@ public class IdentityObjectTree { IdentityObjectTree objectTree; if (this.children == null) { - this.children = new ConcurrentHashMap>(2, .8f, 1); + this.children = createChildren(); // might as well add too objectTree = new IdentityObjectTree(); diff --git a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java index 9edf883..5f945e5 100644 --- a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java @@ -1,11 +1,12 @@ package net.engio.mbassy.multi.common; +import it.unimi.dsi.fastutil.objects.Object2BooleanOpenHashMap; + import java.lang.annotation.Annotation; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; -import java.util.ArrayDeque; import java.util.Collection; -import java.util.HashSet; +import java.util.Collections; import java.util.Set; import net.engio.mbassy.multi.annotations.Handler; @@ -20,7 +21,10 @@ public class ReflectionUtils // modified by dorkbox, llc 2015 public static Collection getMethods(Class target) { - Collection methods = new ArrayDeque(); + return getMethods(target, new StrongConcurrentSet()); + } + + public static Collection getMethods(Class target, Collection methods) { try { for (Method method : target.getDeclaredMethods()) { if (getAnnotation(method, Handler.class) != null) { @@ -31,7 +35,7 @@ public class ReflectionUtils } if (!target.equals(Object.class)) { - methods.addAll(getMethods(target.getSuperclass())); + getMethods(target.getSuperclass(), methods); } return methods; } @@ -66,7 +70,8 @@ public class ReflectionUtils * @return A set of classes, each representing a super type of the root class */ public static Set> getSuperTypes(Class from) { - Set> superclasses = new HashSet>(); + Set> superclasses = new StrongConcurrentSet>(8, .8f); + collectInterfaces( from, superclasses ); while ( !from.equals( Object.class ) && !from.isInterface() ) { @@ -104,7 +109,7 @@ public class ReflectionUtils * @param Class of annotation type * @return Annotation instance or null */ - private static A getAnnotation( AnnotatedElement from, Class annotationType, Set visited) { + private static A getAnnotation( AnnotatedElement from, Class annotationType, Collection visited) { if( visited.contains(from) ) { return null; } @@ -123,7 +128,7 @@ public class ReflectionUtils } public static A getAnnotation( AnnotatedElement from, Class annotationType){ - return getAnnotation(from, annotationType, new HashSet()); + return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap(8, .8f))); } // diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 1f0a31b..6ee6082 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -1,5 +1,6 @@ package net.engio.mbassy.multi.common; -import java.util.IdentityHashMap; +import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap; + import java.util.Iterator; /** @@ -13,11 +14,12 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - this(16, .75f); + this(16, 0.75f); } - public StrongConcurrentSet(int size, float lOAD_FACTOR) { - super(new IdentityHashMap>(size)); + public StrongConcurrentSet(int size, float loadFactor) { +// super(new ConcurrentHashMap>(size, loadFactor, 1)); + super(new Reference2ReferenceOpenHashMap>(size, loadFactor)); } @Override diff --git a/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java b/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java index d29b5c0..4e4e42a 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java @@ -1,33 +1,34 @@ package net.engio.mbassy.multi.listener; -import java.util.ArrayDeque; import java.util.Collection; +import net.engio.mbassy.multi.common.StrongConcurrentSet; + /** - * All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, - * a message listener is any object capable of receiving messages by means of defined message handlers. - * There are no restrictions about the number of allowed message handlers in a message listener. + * All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message + * listener is any object capable of receiving messages by means of defined message handlers. There are no restrictions about the number of + * allowed message handlers in a message listener. * - * A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler - * definition it contains. + * A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler definition it + * contains. * - * This class is an internal representation of a message listener used to encapsulate all relevant objects - * and data about that message listener, especially all its handlers. - * There will be only one instance of MessageListener per message listener class and message bus instance. + * This class is an internal representation of a message listener used to encapsulate all relevant objects and data about that message + * listener, especially all its handlers. There will be only one instance of MessageListener per message listener class and message bus + * instance. * - * @author bennidi - * Date: 12/16/12 + * @author bennidi Date: 12/16/12 */ public class MessageListener { - private Collection handlers = new ArrayDeque(); + private final Collection handlers; private Class listenerDefinition; - public MessageListener(Class listenerDefinition) { - this.listenerDefinition = listenerDefinition; + public MessageListener(Class listenerDefinition, int size) { + this.handlers = new StrongConcurrentSet(size, 0.8F); + this.listenerDefinition = listenerDefinition; } - public boolean isFromListener(Class listener){ + public boolean isFromListener(Class listener) { return this.listenerDefinition.equals(listener); } diff --git a/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java index d55a1df..b2ced29 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java @@ -1,11 +1,11 @@ package net.engio.mbassy.multi.listener; import java.lang.reflect.Method; -import java.util.ArrayDeque; import java.util.Collection; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.common.ReflectionUtils; +import net.engio.mbassy.multi.common.StrongConcurrentSet; /** * The meta data reader is responsible for parsing and validating message handler configurations. @@ -23,19 +23,19 @@ public class MetadataReader { Collection allHandlers = ReflectionUtils.getMethods(target); // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) - Collection bottomMostHandlers = new ArrayDeque(); + Collection bottomMostHandlers = new StrongConcurrentSet(allHandlers.size(), .8F); for (Method handler : allHandlers) { if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) { bottomMostHandlers.add(handler); } } - MessageListener listenerMetadata = new MessageListener(target); + MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size()); // for each handler there will be no overriding method that specifies @Handler annotation // but an overriding method does inherit the listener configuration of the overwritten method for (Method handler : bottomMostHandlers) { - Handler handlerConfig = ReflectionUtils.getAnnotation( handler, Handler.class); + Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class); if (handlerConfig == null || !handlerConfig.enabled()) { continue; // disabled or invalid listeners are ignored } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 5a56660..d8e91c8 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -37,9 +37,8 @@ public class Subscription { private final Collection listeners; Subscription(MessageHandler handler) { -// this.listeners = new WeakConcurrentSet(); - this.listeners = new StrongConcurrentSet(); this.handlerMetadata = handler; + this.listeners = new StrongConcurrentSet(); IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); if (handler.isSynchronized()) { @@ -92,43 +91,43 @@ public class Subscription { Method handler = this.handlerMetadata.getHandler(); IHandlerInvocation invocation = this.invocation; - for (Object listener : listeners) { - try { + try { + for (Object listener : listeners) { invocation.invoke(listener, handler, message); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0]) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message)); } + } catch (IllegalAccessException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The class or method is not accessible") + .setCause(e) + .setMethodName(handler.getName()) +// .setListener(listener) + .setPublishedObject(message)); + } catch (IllegalArgumentException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0]) + .setCause(e) + .setMethodName(handler.getName()) +// .setListener(listener) + .setPublishedObject(message)); + } catch (InvocationTargetException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Message handler threw exception") + .setCause(e) + .setMethodName(handler.getName()) +// .setListener(listener) + .setPublishedObject(message)); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The handler code threw an exception") + .setCause(e) + .setMethodName(handler.getName()) +// .setListener(listener) + .setPublishedObject(message)); } } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index ad24782..559a7bd 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -40,7 +40,7 @@ public class SubscriptionManager { // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, Collection> subscriptionsPerMessageSingle; + private final ConcurrentHashMap, Collection> subscriptionsPerMessageSingle; private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type @@ -67,18 +67,10 @@ public class SubscriptionManager { private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); public SubscriptionManager(int numberOfThreads) { - this.MAP_STRIPING = 1; + this.MAP_STRIPING = numberOfThreads; this.LOAD_FACTOR = 0.8f; -// this.subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(4); -// this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); -// -// // only used during SUB/UNSUB -// this.subscriptionsPerListener = new IdentityHashMap, Collection>(4); -// -// this.superClassesCache = new IdentityHashMap, Collection>>(8); - - this.subscriptionsPerMessageSingle = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); + this.subscriptionsPerMessageSingle = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, 1); this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // only used during SUB/UNSUB @@ -103,7 +95,7 @@ public class SubscriptionManager { Class listenerClass = listener.getClass(); Collection subscriptions; boolean nothingLeft = true; - Lock UPDATE = this.LOCK.updateLock(); + Lock UPDATE = this.LOCK.writeLock(); try { UPDATE.lock(); @@ -168,10 +160,7 @@ public class SubscriptionManager { } if (nothingLeft) { - Lock WRITE = this.LOCK.writeLock(); - WRITE.lock(); this.subscriptionsPerListener.remove(listenerClass); - WRITE.unlock(); } } finally { @@ -181,6 +170,15 @@ public class SubscriptionManager { return; } + private final ThreadLocal> subInitialValue = new ThreadLocal>() { + @Override + protected java.util.Collection initialValue() { +// return new ArrayDeque(8); +// return Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, SubscriptionManager.this.LOAD_FACTOR)); +// return Collections.newSetFromMap(new ConcurrentHashMap(8, SubscriptionManager.this.LOAD_FACTOR, SubscriptionManager.this.MAP_STRIPING)); + return new StrongConcurrentSet(8, SubscriptionManager.this.LOAD_FACTOR); + } + }; // when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"? public void subscribe(Object listener) { @@ -192,9 +190,9 @@ public class SubscriptionManager { } Collection subscriptions; - Lock UPDATE = this.LOCK.updateLock(); + Lock WRITE = this.LOCK.writeLock(); try { - UPDATE.lock(); + WRITE.lock(); subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { @@ -203,19 +201,19 @@ public class SubscriptionManager { subscription.subscribe(listener); } } else { - Lock WRITE = this.LOCK.writeLock(); - try { - WRITE.lock(); // upgrade updatelock to write lock, Avoid DCL + // a listener is subscribed for the first time + Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); + int handlersSize = messageHandlers.size(); - // a listener is subscribed for the first time - Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); - if (messageHandlers.isEmpty()) { - // remember the class as non listening class if no handlers are found - this.nonListeners.put(listenerClass, this.holder); - return; - } - - subscriptions = new StrongConcurrentSet(8, this.LOAD_FACTOR); + if (handlersSize == 0) { + // remember the class as non listening class if no handlers are found + this.nonListeners.put(listenerClass, this.holder); + } else { + subscriptions = new StrongConcurrentSet(handlersSize, this.LOAD_FACTOR); +// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, this.MAP_STRIPING)); +// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1)); +// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR)); + this.subscriptionsPerListener.put(listenerClass, subscriptions); resetSuperClassSubs(); @@ -225,6 +223,11 @@ public class SubscriptionManager { Subscription subscription = new Subscription(messageHandler); subscription.subscribe(listener); + subscriptions.add(subscription); + + // + // save the subscription per message type + // // single or multi? Class[] handledMessageTypes = subscription.getHandledMessageTypes(); int size = handledMessageTypes.length; @@ -236,70 +239,70 @@ public class SubscriptionManager { Collection subs = this.subscriptionsPerMessageSingle.get(clazz); if (subs == null) { - // NOTE: Order is important for safe publication - subs = new StrongConcurrentSet(8, this.LOAD_FACTOR); - subs.add(subscription); - this.subscriptionsPerMessageSingle.put(clazz, subs); - - } else { - subs.add(subscription); + Collection putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get()); + if (putIfAbsent != null) { + subs = putIfAbsent; + } else { + subs = this.subInitialValue.get(); +// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1))); +// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR))); + this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); +// this.subInitialValue.set(new ArrayDeque(8)); + } } + subs.add(subscription); + if (acceptsSubtypes) { + // race conditions will result in duplicate answers, which we don't care about setupSuperClassCache(clazz); } } else { - // NOTE: Not thread-safe! must be synchronized in outer scope - IdentityObjectTree, Collection> tree; - - switch (size) { - case 2: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); - if (acceptsSubtypes) { - setupSuperClassCache(handledMessageTypes[0]); - setupSuperClassCache(handledMessageTypes[1]); - } - break; - } - case 3: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); - if (acceptsSubtypes) { - setupSuperClassCache(handledMessageTypes[0]); - setupSuperClassCache(handledMessageTypes[1]); - setupSuperClassCache(handledMessageTypes[2]); - } - break; - } - default: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); - if (acceptsSubtypes) { - for (Class c : handledMessageTypes) { - setupSuperClassCache(c); - } - } - break; - } - } - - Collection subs = tree.getValue(); - if (subs == null) { - subs = new StrongConcurrentSet(16, this.LOAD_FACTOR); - tree.putValue(subs); - } - subs.add(subscription); +// // NOTE: Not thread-safe! must be synchronized in outer scope +// IdentityObjectTree, Collection> tree; +// +// switch (size) { +// case 2: { +// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); +// if (acceptsSubtypes) { +// setupSuperClassCache(handledMessageTypes[0]); +// setupSuperClassCache(handledMessageTypes[1]); +// } +// break; +// } +// case 3: { +// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); +// if (acceptsSubtypes) { +// setupSuperClassCache(handledMessageTypes[0]); +// setupSuperClassCache(handledMessageTypes[1]); +// setupSuperClassCache(handledMessageTypes[2]); +// } +// break; +// } +// default: { +// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); +// if (acceptsSubtypes) { +// for (Class c : handledMessageTypes) { +// setupSuperClassCache(c); +// } +// } +// break; +// } +// } +// +// Collection subs = tree.getValue(); +// if (subs == null) { +// subs = new StrongConcurrentSet(16, this.LOAD_FACTOR); +// tree.putValue(subs); +// } +// subs.add(subscription); } - - subscriptions.add(subscription); } - - this.subscriptionsPerListener.put(listenerClass, subscriptions); - } finally { - WRITE.unlock(); } } } finally { - UPDATE.unlock(); + WRITE.unlock(); } } @@ -329,7 +332,6 @@ public class SubscriptionManager { } - // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType) { Map, Collection> superClassSubs = this.superClassSubscriptions; @@ -346,7 +348,8 @@ public class SubscriptionManager { return null; } - subsPerType = new StrongConcurrentSet(16, this.LOAD_FACTOR); +// subsPerType = new StrongConcurrentSet(types.size(), this.LOAD_FACTOR); + subsPerType = new ArrayDeque(types.size() + 1); for (Class superClass : types) { Collection subs = this.subscriptionsPerMessageSingle.get(superClass); @@ -485,13 +488,19 @@ public class SubscriptionManager { return subsPerType; } + + /** + * race conditions will result in duplicate answers, which we don't care if happens + */ private Collection> setupSuperClassCache(Class clazz) { Collection> types = this.superClassesCache.get(clazz); if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); - types = new ArrayDeque>(superTypes); +// types = new ArrayDeque>(superTypes); + types = new StrongConcurrentSet>(superTypes.size(), this.LOAD_FACTOR); + types.addAll(superTypes); // race conditions will result in duplicate answers, which we don't care about this.superClassesCache.put(clazz, types);