diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 6fdb73b..3aefc5c 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,11 +1,8 @@ package net.engio.mbassy.multi; -import it.unimi.dsi.fastutil.objects.ObjectIterator; -import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry; -import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; - import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; @@ -152,13 +149,17 @@ public class MultiMBassador implements IMessageBus { } } - FastEntrySet superSubscriptions = manager.getSuperSubscriptions(messageClass); +// FastEntrySet superSubscriptions = manager.getSuperSubscriptions(messageClass); + Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses if (superSubscriptions != null && !superSubscriptions.isEmpty()) { - ObjectIterator> fastIterator = superSubscriptions.fastIterator(); +// ObjectIterator> fastIterator = superSubscriptions.fastIterator(); + Iterator fastIterator = superSubscriptions.iterator(); while (fastIterator.hasNext()) { - Subscription sub = fastIterator.next().getKey(); +// Subscription sub = fastIterator.next().getKey(); + Subscription sub = fastIterator.next(); + // this catches all exception types sub.publishToSubscription(this, message); } diff --git a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java index a4e30d0..334159e 100644 --- a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java +++ b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java @@ -1,8 +1,7 @@ package net.engio.mbassy.multi.common; -import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap; - import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** @@ -24,8 +23,9 @@ public class IdentityObjectTree { // can be overridded to provide a custom backing map protected Map> createChildren() { -// return new ConcurrentHashMap>(2, 0.75f, 1); - return new Reference2ReferenceOpenHashMap>(2, 0.75f); +//TODO: this needs to be concurrent?!?!? + return new ConcurrentHashMap>(2, 0.75f, 1); +// return new Reference2ReferenceOpenHashMap>(2, 0.75f); } public VALUE getValue() { diff --git a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java index da9e88f..3dc94b5 100644 --- a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java @@ -1,13 +1,12 @@ package net.engio.mbassy.multi.common; -import it.unimi.dsi.fastutil.objects.Object2BooleanOpenHashMap; - import java.lang.annotation.Annotation; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.Collection; import java.util.Collections; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import net.engio.mbassy.multi.annotations.Handler; @@ -127,8 +126,19 @@ public class ReflectionUtils return null; } - public static A getAnnotation( AnnotatedElement from, Class annotationType){ - return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap(8, 0.8F))); + private static final ThreadLocal> annotationCollector = new ThreadLocal>() { + @Override + protected Collection initialValue() { + return Collections.newSetFromMap(new ConcurrentHashMap(8, .8F, 1)); + } + }; + + public static A getAnnotation( AnnotatedElement from, Class annotationType) { + Collection collection = annotationCollector.get(); + + A annotation = getAnnotation(from, annotationType, collection); + collection.clear(); + return annotation; } // diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index b54b195..f70c251 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -18,13 +18,11 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet(int size, float loadFactor) { super(new ConcurrentHashMap>(size, loadFactor, 1)); -// super(new Reference2ReferenceOpenHashMap>(size, loadFactor)); } @Override public Iterator iterator() { return new Iterator() { - private ISetEntry current = StrongConcurrentSet.this.head; @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 6d1513c..f998dc3 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -2,12 +2,14 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.InvocationTargetException; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.multi.error.ErrorHandlingSupport; +import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.listener.MessageHandler; import com.esotericsoftware.reflectasm.MethodAccess; @@ -28,6 +30,9 @@ import com.esotericsoftware.reflectasm.MethodAccess; * Date: 2/2/15 */ public class Subscription { + private static AtomicInteger ID_COUNTER = new AtomicInteger(); + private final int ID = ID_COUNTER.getAndIncrement(); + // the handler's metadata -> for each handler in a listener, a unique subscription context is created private final MessageHandler handlerMetadata; @@ -47,22 +52,14 @@ public class Subscription { this.invocation = invocation; } - /** - * Check whether this subscription manages a message handler of the given message listener class - */ - // only in unit test - public boolean belongsTo(Class listener){ - return this.handlerMetadata.isFromListener(listener); + public Class[] getHandledMessageTypes() { + return this.handlerMetadata.getHandledMessages(); } public boolean acceptsSubtypes() { return this.handlerMetadata.acceptsSubtypes(); } - public Class[] getHandledMessageTypes() { - return this.handlerMetadata.getHandledMessages(); - } - public void subscribe(Object listener) { this.listeners.add(listener); } @@ -74,8 +71,12 @@ public class Subscription { return this.listeners.remove(existingListener); } - public boolean isEmpty() { - return this.listeners.isEmpty(); + /** + * Check whether this subscription manages a message handler of the given message listener class + */ + // only in unit test + public boolean belongsTo(Class listener){ + return this.handlerMetadata.isFromListener(listener); } // only used in unit-test @@ -96,41 +97,41 @@ public class Subscription { invocation.invoke(listener, handler, handleIndex, message); } catch (IllegalAccessException e) { e.printStackTrace(); - // errorHandler.handlePublicationError(new PublicationError() - // .setMessage("Error during invocation of message handler. " + - // "The class or method is not accessible") - // .setCause(e) - // .setMethodName(handler.getName()) - //// .setListener(listener) - // .setPublishedObject(message)); + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The class or method is not accessible") + .setCause(e) + .setMethodName(handler.getMethodNames()[handleIndex]) + .setListener(listener) + .setPublishedObject(message)); } catch (IllegalArgumentException e) { e.printStackTrace(); - // errorHandler.handlePublicationError(new PublicationError() - // .setMessage("Error during invocation of message handler. " + - // "Wrong arguments passed to method. Was: " + message.getClass() - // + "Expected: " + handler.getParameterTypes()[0]) - // .setCause(e) - // .setMethodName(handler.getName()) - //// .setListener(listener) - // .setPublishedObject(message)); + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0]) + .setCause(e) + .setMethodName(handler.getMethodNames()[handleIndex]) + .setListener(listener) + .setPublishedObject(message)); } catch (InvocationTargetException e) { e.printStackTrace(); - // errorHandler.handlePublicationError(new PublicationError() - // .setMessage("Error during invocation of message handler. " + - // "Message handler threw exception") - // .setCause(e) - // .setMethodName(handler.getName()) - //// .setListener(listener) - // .setPublishedObject(message)); + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Message handler threw exception") + .setCause(e) + .setMethodName(handler.getMethodNames()[handleIndex]) + .setListener(listener) + .setPublishedObject(message)); } catch (Throwable e) { e.printStackTrace(); - // errorHandler.handlePublicationError(new PublicationError() - // .setMessage("Error during invocation of message handler. " + - // "The handler code threw an exception") - // .setCause(e) - // .setMethodName(handler.getName()) - //// .setListener(listener) - // .setPublishedObject(message)); + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The handler code threw an exception") + .setCause(e) + .setMethodName(handler.getMethodNames()[handleIndex]) + .setListener(listener) + .setPublishedObject(message)); } } } @@ -292,10 +293,7 @@ public class Subscription { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (this.handlerMetadata == null ? 0 : this.handlerMetadata.hashCode()); - return result; + return this.ID; } @Override @@ -310,13 +308,6 @@ public class Subscription { return false; } Subscription other = (Subscription) obj; - if (this.handlerMetadata == null) { - if (other.handlerMetadata != null) { - return false; - } - } else if (!this.handlerMetadata.equals(other.handlerMetadata)) { - return false; - } - return true; + return this.ID == other.ID; } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 797f363..687e17f 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -1,10 +1,5 @@ package net.engio.mbassy.multi.subscription; -import it.unimi.dsi.fastutil.objects.ObjectIterator; -import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap; -import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry; -import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; - import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -34,7 +29,7 @@ public class SubscriptionManager { private float LOAD_FACTOR; // the metadata reader that is used to inspect objects passed to the subscribe method - private final MetadataReader metadataReader = new MetadataReader(); + private static final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message @@ -50,12 +45,12 @@ public class SubscriptionManager { private final Object holder = new Object[0]; - private final Map, FastEntrySet>> superClassesCache; + private final Map, Set>> superClassesCache; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private Map, FastEntrySet> superClassSubscriptions; + private Map, Set> superClassSubscriptions; // private final IdentityObjectTree, Collection> superClassSubscriptionsMulti = new IdentityObjectTree, Collection>(); @@ -73,10 +68,10 @@ public class SubscriptionManager { // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); - this.superClassesCache = new ConcurrentHashMap, FastEntrySet>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassesCache = new ConcurrentHashMap, Set>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance on handlers - this.superClassSubscriptions = new ConcurrentHashMap, FastEntrySet>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassSubscriptions = new ConcurrentHashMap, Set>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.nonListeners = new ConcurrentHashMap, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); } @@ -172,7 +167,7 @@ public class SubscriptionManager { Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions == null) { // a listener is subscribed for the first time - Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); + Collection messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); int handlersSize = messageHandlers.size(); if (handlersSize == 0) { @@ -347,65 +342,61 @@ public class SubscriptionManager { // must be protected by read lock // CAN RETURN NULL - not thread safe. - public Collection getSubscriptionsByMessageType(Class messageType) { + public final Collection getSubscriptionsByMessageType(Class messageType) { return this.subscriptionsPerMessageSingle.get(messageType); } // must be protected by read lock // CAN RETURN NULL - not thread safe. - public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { + public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2); } // must be protected by read lock // CAN RETURN NULL - not thread safe. - public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { + public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); } // must be protected by read lock // CAN RETURN NULL - not thread safe. - public Collection getSubscriptionsByMessageType(Class... messageTypes) { + public final Collection getSubscriptionsByMessageType(Class... messageTypes) { return this.subscriptionsPerMessageMulti.getValue(messageTypes); } // ALSO checks to see if the superClass accepts subtypes. - public FastEntrySet getSuperSubscriptions(Class superType) { - Map, FastEntrySet> superClassSubs = this.superClassSubscriptions; - if (superClassSubs == null) { - // we haven't created it yet (via subscribe) - return null; - } - - FastEntrySet subsPerType = superClassSubs.get(superType); + public final Collection getSuperSubscriptions(Class superType) { + // whenever our subscriptions change, this map is cleared. + Set subsPerType = this.superClassSubscriptions.get(superType); if (subsPerType == null) { - FastEntrySet> types = this.superClassesCache.get(superType); + // this caches our class hierarchy. This is never cleared. + Set> types = this.superClassesCache.get(superType); if (types == null || types.isEmpty()) { return null; } - Reference2BooleanArrayMap map = new Reference2BooleanArrayMap(types.size() + 1); + subsPerType = new StrongConcurrentSet(types.size() + 1, this.LOAD_FACTOR); - ObjectIterator>> fastIterator = types.fastIterator(); - while (fastIterator.hasNext()) { - Class superClass = fastIterator.next().getKey(); + Iterator> iterator = types.iterator(); + while (iterator.hasNext()) { + Class superClass = iterator.next(); Collection subs = this.subscriptionsPerMessageSingle.get(superClass); if (subs != null && !subs.isEmpty()) { for (Subscription sub : subs) { if (sub.acceptsSubtypes()) { - map.put(sub, Boolean.TRUE); + subsPerType.add(sub); } } } } - subsPerType = map.reference2BooleanEntrySet(); - superClassSubs.put(superType, subsPerType); + // cache our subscriptions for super classes, so that their access can be fast! + this.superClassSubscriptions.put(superType, subsPerType); } return subsPerType; @@ -539,15 +530,13 @@ public class SubscriptionManager { if (!this.superClassesCache.containsKey(clazz)) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); - Reference2BooleanArrayMap> map = new Reference2BooleanArrayMap>(superTypes.size() + 1); + StrongConcurrentSet> set = new StrongConcurrentSet>(superTypes.size() + 1, this.LOAD_FACTOR); for (Class c : superTypes) { - map.put(c, Boolean.TRUE); + set.add(c); } - FastEntrySet> fastSet = map.reference2BooleanEntrySet(); - // race conditions will result in duplicate answers, which we don't care about - this.superClassesCache.put(clazz, fastSet); + this.superClassesCache.put(clazz, set); } }