diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 8dc6e5a..a290452 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -1,5 +1,9 @@ package dorkbox.util.messagebus; +import dorkbox.util.messagebus.common.*; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; +import dorkbox.util.messagebus.subscription.Subscription; + import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -8,16 +12,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import dorkbox.util.messagebus.common.ConcurrentHashMapV8; -import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.SubscriptionUtils; -import dorkbox.util.messagebus.common.VarArgPossibility; -import dorkbox.util.messagebus.common.VarArgUtils; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; -import dorkbox.util.messagebus.listener.MessageHandler; -import dorkbox.util.messagebus.listener.MetadataReader; -import dorkbox.util.messagebus.subscription.Subscription; - /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known @@ -40,10 +34,7 @@ public class SubscriptionManager { private static final float LOAD_FACTOR = 0.8F; - // the metadata reader that is used to inspect objects passed to the subscribe method - private static final MetadataReader metadataReader = new MetadataReader(); - - final SubscriptionUtils utils; + private final SubscriptionUtils utils; // remember already processed classes that do not contain any message handlers private final Map, Boolean> nonListeners; @@ -66,21 +57,16 @@ public class SubscriptionManager { private final VarArgUtils varArgUtils; - // stripe size of maps for concurrency - private final int STRIPE_SIZE; - // private final StampedLock lock = new StampedLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); SubscriptionManager(int numberOfThreads) { - this.STRIPE_SIZE = numberOfThreads; - float loadFactor = SubscriptionManager.LOAD_FACTOR; // modified ONLY during SUB/UNSUB { - this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, loadFactor, this.STRIPE_SIZE); + this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, loadFactor, numberOfThreads); this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, ArrayList>(64, LOAD_FACTOR, 1); this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, loadFactor); @@ -93,7 +79,7 @@ public class SubscriptionManager { // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE); + this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); } public void shutdown() { @@ -150,13 +136,13 @@ public class SubscriptionManager { return; } - ArrayList subsPerListener = new ArrayList(); - Collection subsForPublication = null; VarArgPossibility varArgPossibility = this.varArgPossibility; Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + ArrayList subsPerListener = new ArrayList(handlersSize); + Collection subsForPublication = null; // create the subscription MessageHandler messageHandler; @@ -168,10 +154,10 @@ public class SubscriptionManager { Subscription subscription = new Subscription(messageHandler); subscription.subscribe(listener); + subsPerListener.add(subscription); // activates this sub for sub/unsub + // now add this subscription to each of the handled types subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); - - subsPerListener.add(subscription); // activates this sub for sub/unsub subsForPublication.add(subscription); // activates this sub for publication } @@ -195,7 +181,8 @@ public class SubscriptionManager { } // inside a write lock - private final Collection getSubsForPublication(final Class[] messageHandlerTypes, + // also puts it into the correct map if it's not already there + private Collection getSubsForPublication(final Class[] messageHandlerTypes, final Map, ArrayList> subsPerMessageSingle, final HashMapTree, ArrayList> subsPerMessageMulti, final VarArgPossibility varArgPossibility) { @@ -211,7 +198,7 @@ public class SubscriptionManager { case 1: { ArrayList subs = subsPerMessageSingle.get(type0); if (subs == null) { - subs = new ArrayList(8); + subs = new ArrayList(); boolean isArray = utils.isArray(type0); if (isArray) { @@ -219,6 +206,8 @@ public class SubscriptionManager { } // cache the super classes +// todo: makes it's own read/write lock. it's 2x as expensive when running inside the writelock for subscribe, VS on it's own +// maybe even use StampedLock utils.cacheSuperClasses(type0, isArray); subsPerMessageSingle.put(type0, subs); diff --git a/src/main/java/dorkbox/util/messagebus/listener/MessageHandler.java b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java similarity index 95% rename from src/main/java/dorkbox/util/messagebus/listener/MessageHandler.java rename to src/main/java/dorkbox/util/messagebus/common/MessageHandler.java index f3c2eab..d9e4d9c 100644 --- a/src/main/java/dorkbox/util/messagebus/listener/MessageHandler.java +++ b/src/main/java/dorkbox/util/messagebus/common/MessageHandler.java @@ -1,4 +1,4 @@ -package dorkbox.util.messagebus.listener; +package dorkbox.util.messagebus.common; import java.lang.reflect.Method; import java.util.ArrayList; @@ -8,7 +8,6 @@ import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Synchronized; -import dorkbox.util.messagebus.common.ReflectionUtils; /** * Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains diff --git a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java index c6abe25..5f8d7eb 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java @@ -70,7 +70,7 @@ public class ReflectionUtils { * @param from The root class to start with * @return A set of classes, each representing a super type of the root class */ - public static ArrayList> getSuperTypes(Class from) { + public static Class[] getSuperTypes(Class from) { ArrayList> superclasses = new ArrayList>(); collectInterfaces( from, superclasses ); @@ -81,7 +81,7 @@ public class ReflectionUtils { collectInterfaces( from, superclasses ); } - return superclasses; + return superclasses.toArray(new Class[superclasses.size()]); } public static void collectInterfaces( Class from, Collection> accumulator ) { diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java index 7012894..1eda3ea 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java @@ -3,8 +3,9 @@ package dorkbox.util.messagebus.common; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import dorkbox.util.messagebus.common.thread.ClassHolder; import dorkbox.util.messagebus.common.thread.ConcurrentSet; @@ -48,7 +49,7 @@ public class SubscriptionUtils { this.arrayVersionCache = new ConcurrentHashMapV8, Class>(32, loadFactor, stripeSize); this.isArrayCache = new ConcurrentHashMapV8, Boolean>(32, loadFactor, stripeSize); - this.superClassesCache = new ConcurrentHashMapV8, Class[]>(32, loadFactor, 1); + this.superClassesCache = new ConcurrentHashMapV8, Class[]>(32, loadFactor, 8); this.classHolderSingle = new ClassHolder(loadFactor, stripeSize); // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. @@ -64,39 +65,51 @@ public class SubscriptionUtils { this.superClassSubscriptions.clear(); } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + /** * never returns null * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) * * if parameter clazz is of type array, then the super classes are of array type as well * - * protected by read lock by caller + * protected by read lock by caller. The cache version is called first, by write lock */ public final Class[] getSuperClasses_NL(final Class clazz, final boolean isArray) { // this is never reset, since it never needs to be. final Map, Class[]> local = this.superClassesCache; + Class[] classes = local.get(clazz); if (classes == null) { // get all super types of class - final Collection> superTypes = ReflectionUtils.getSuperTypes(clazz); - ArrayList> newList = new ArrayList>(superTypes.size()); + final Class[] superTypes = ReflectionUtils.getSuperTypes(clazz); + int length = superTypes.length; + + ArrayList> newList = new ArrayList>(length); - Iterator> iterator; Class c; + if (isArray) { + for (int i=0;i[newList.size()]); local.put(clazz, classes); } @@ -199,17 +212,17 @@ public class SubscriptionUtils { * Cache the values of JNI method, isArray(c) * @return true if the class c is an array type */ - @SuppressWarnings("boxing") +// @SuppressWarnings("boxing") public final boolean isArray(final Class c) { - final Map, Boolean> isArrayCache = this.isArrayCache; - - final Boolean isArray = isArrayCache.get(c); - if (isArray == null) { +// final Map, Boolean> isArrayCache = this.isArrayCache; +// +// final Boolean isArray = isArrayCache.get(c); +// if (isArray == null) { boolean b = c.isArray(); - isArrayCache.put(c, b); +// isArrayCache.put(c, b); return b; - } - return isArray; +// } +// return isArray; } diff --git a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java b/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java deleted file mode 100644 index ddd5aef..0000000 --- a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java +++ /dev/null @@ -1,57 +0,0 @@ -package dorkbox.util.messagebus.listener; - -import java.lang.reflect.Method; -import java.util.ArrayList; - -import dorkbox.util.messagebus.annotations.Handler; -import dorkbox.util.messagebus.common.ReflectionUtils; - -/** - * The meta data reader is responsible for parsing and validating message handler configurations. - * - * @author bennidi - * Date: 11/16/12 - * @author dorkbox - * Date: 2/2/15 - */ -public class MetadataReader { - - // get all listeners defined by the given class (includes - // listeners defined in super classes) - public MessageHandler[] getMessageHandlers(final Class target) { - - // get all handlers (this will include all (inherited) methods directly annotated using @Handler) - final Method[] allMethods = ReflectionUtils.getMethods(target); - final int length = allMethods.length; - - final ArrayList finalMethods = new ArrayList(length); - Method method; - - for (int i=0;i expectedHandler: this.handlers.entrySet()) { NClasses key = expectedHandler.getKey(); - List handlers2 = getHandlers(listener, key.messageTypes); + List handlers2 = pruneHandlers(allHandlers, key.messageTypes); if (expectedHandler.getValue() > 0){ assertTrue(!handlers2.isEmpty()); @@ -135,9 +130,10 @@ public class MetadataReaderTest extends AssertSupport { } // for testing - public List getHandlers(MessageListener listener, Class... messageTypes) { + public List pruneHandlers(MessageHandler[] allHandlers, Class... messageTypes) { List matching = new LinkedList(); - for (MessageHandler handler : listener.getHandlers()) { + + for (MessageHandler handler : allHandlers) { if (handlesMessage(handler, messageTypes)) { matching.add(handler); } @@ -198,7 +194,7 @@ public class MetadataReaderTest extends AssertSupport { @Test public void testMultipleSignatureListenerWithoutInheritance() { - MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class, 0.85F, 4); + MessageHandler[] allHandlers = MessageHandler.get(MultiMessageListener1.class); ListenerValidator validator = new ListenerValidator() .expectHandlers(7, String.class) .expectHandlers(9, String.class, String.class) @@ -211,7 +207,7 @@ public class MetadataReaderTest extends AssertSupport { .expectHandlers(2, String.class, Object.class) .expectHandlers(2, String.class, Object[].class) ; - validator.check(listener); + validator.check(allHandlers); } @SuppressWarnings("unused") diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java index 740ea64..e6530c6 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java @@ -13,13 +13,11 @@ import java.util.concurrent.LinkedTransferQueue; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; +import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2; import dorkbox.util.messagebus.common.thread.ConcurrentSet; -import dorkbox.util.messagebus.listener.MessageHandler; -import dorkbox.util.messagebus.listener.MessageListener; -import dorkbox.util.messagebus.listener.MetadataReader; import dorkbox.util.messagebus.subscription.Subscription; @@ -28,7 +26,9 @@ public class PerfTest_Collections { public static final Integer TEST_VALUE = Integer.valueOf(777); private static final float LOAD_FACTOR = 0.8F; - private static final MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class, LOAD_FACTOR, 8); + + + private static final MessageHandler[] allHandlers = MessageHandler.get(Listener.class); public static void main(final String[] args) throws Exception { final int size = 16; @@ -73,10 +73,8 @@ public class PerfTest_Collections { final int warmupRuns = 2; final int runs = 3; - Collection handlers = messageListener.getHandlers(); - for (int i=0;i listener){ - return this.handlerMetadata.isFromListener(listener); + public boolean belongsTo(Subscription subscription, Class listener) { - // only in unit test - public boolean isFromListener(Class listener){ - return this.listenerConfig.isFromListener(listener); - } - +// return this.handlerMetadata.isFromListener(listener); +// +// +// // only in unit test +// public boolean isFromListener(Class listener){ +// return this.listenerConfig.isFromListener(listener); +// } + return false; } // only in unit test - public boolean isFromListener(Class listener) { - return this.listenerDefinition.equals(listener); - } +// public boolean isFromListener(Class listener) { +// return this.listenerDefinition.equals(listener); +// } private Collection getEntries(Class messageType){