WIP - now have superclass path enabled. @ 192 ns/op

This commit is contained in:
nathan 2015-02-22 02:39:08 +01:00
parent 3315f079d5
commit 0171a5558c
6 changed files with 95 additions and 106 deletions

View File

@ -1,11 +1,8 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
@ -152,13 +149,17 @@ public class MultiMBassador implements IMessageBus {
} }
} }
FastEntrySet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass); // FastEntrySet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses // now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) { if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
ObjectIterator<Entry<Subscription>> fastIterator = superSubscriptions.fastIterator(); // ObjectIterator<Entry<Subscription>> fastIterator = superSubscriptions.fastIterator();
Iterator<Subscription> fastIterator = superSubscriptions.iterator();
while (fastIterator.hasNext()) { while (fastIterator.hasNext()) {
Subscription sub = fastIterator.next().getKey(); // Subscription sub = fastIterator.next().getKey();
Subscription sub = fastIterator.next();
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message); sub.publishToSubscription(this, message);
} }

View File

@ -1,8 +1,7 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -24,8 +23,9 @@ public class IdentityObjectTree<KEY, VALUE> {
// can be overridded to provide a custom backing map // can be overridded to provide a custom backing map
protected Map<KEY, IdentityObjectTree<KEY, VALUE>> createChildren() { protected Map<KEY, IdentityObjectTree<KEY, VALUE>> createChildren() {
// return new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f, 1); //TODO: this needs to be concurrent?!?!?
return new Reference2ReferenceOpenHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f); return new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f, 1);
// return new Reference2ReferenceOpenHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f);
} }
public VALUE getValue() { public VALUE getValue() {

View File

@ -1,13 +1,12 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import it.unimi.dsi.fastutil.objects.Object2BooleanOpenHashMap;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement; import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
@ -127,8 +126,19 @@ public class ReflectionUtils
return null; return null;
} }
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){ private static final ThreadLocal<Collection<AnnotatedElement>> annotationCollector = new ThreadLocal<Collection<AnnotatedElement>>() {
return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap<AnnotatedElement>(8, 0.8F))); @Override
protected Collection<AnnotatedElement> initialValue() {
return Collections.newSetFromMap(new ConcurrentHashMap<AnnotatedElement, Boolean>(8, .8F, 1));
}
};
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType) {
Collection<AnnotatedElement> collection = annotationCollector.get();
A annotation = getAnnotation(from, annotationType, collection);
collection.clear();
return annotation;
} }
// //

View File

@ -18,13 +18,11 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public StrongConcurrentSet(int size, float loadFactor) { public StrongConcurrentSet(int size, float loadFactor) {
super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1)); super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1));
// super(new Reference2ReferenceOpenHashMap<T, ISetEntry<T>>(size, loadFactor));
} }
@Override @Override
public Iterator<T> iterator() { public Iterator<T> iterator() {
return new Iterator<T>() { return new Iterator<T>() {
private ISetEntry<T> current = StrongConcurrentSet.this.head; private ISetEntry<T> current = StrongConcurrentSet.this.head;
@Override @Override

View File

@ -2,12 +2,14 @@ package net.engio.mbassy.multi.subscription;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.IHandlerInvocation;
import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.multi.error.ErrorHandlingSupport; import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MessageHandler;
import com.esotericsoftware.reflectasm.MethodAccess; import com.esotericsoftware.reflectasm.MethodAccess;
@ -28,6 +30,9 @@ import com.esotericsoftware.reflectasm.MethodAccess;
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class Subscription { public class Subscription {
private static AtomicInteger ID_COUNTER = new AtomicInteger();
private final int ID = ID_COUNTER.getAndIncrement();
// the handler's metadata -> for each handler in a listener, a unique subscription context is created // the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata; private final MessageHandler handlerMetadata;
@ -47,22 +52,14 @@ public class Subscription {
this.invocation = invocation; this.invocation = invocation;
} }
/** public Class<?>[] getHandledMessageTypes() {
* Check whether this subscription manages a message handler of the given message listener class return this.handlerMetadata.getHandledMessages();
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
} }
public boolean acceptsSubtypes() { public boolean acceptsSubtypes() {
return this.handlerMetadata.acceptsSubtypes(); return this.handlerMetadata.acceptsSubtypes();
} }
public Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
public void subscribe(Object listener) { public void subscribe(Object listener) {
this.listeners.add(listener); this.listeners.add(listener);
} }
@ -74,8 +71,12 @@ public class Subscription {
return this.listeners.remove(existingListener); return this.listeners.remove(existingListener);
} }
public boolean isEmpty() { /**
return this.listeners.isEmpty(); * Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
} }
// only used in unit-test // only used in unit-test
@ -96,41 +97,41 @@ public class Subscription {
invocation.invoke(listener, handler, handleIndex, message); invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
e.printStackTrace(); e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError() errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " + .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible") "The class or method is not accessible")
// .setCause(e) .setCause(e)
// .setMethodName(handler.getName()) .setMethodName(handler.getMethodNames()[handleIndex])
//// .setListener(listener) .setListener(listener)
// .setPublishedObject(message)); .setPublishedObject(message));
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
e.printStackTrace(); e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError() errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " + .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " + message.getClass() "Wrong arguments passed to method. Was: " + message.getClass()
// + "Expected: " + handler.getParameterTypes()[0]) + "Expected: " + handler.getParameterTypes()[0])
// .setCause(e) .setCause(e)
// .setMethodName(handler.getName()) .setMethodName(handler.getMethodNames()[handleIndex])
//// .setListener(listener) .setListener(listener)
// .setPublishedObject(message)); .setPublishedObject(message));
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
e.printStackTrace(); e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError() errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " + .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception") "Message handler threw exception")
// .setCause(e) .setCause(e)
// .setMethodName(handler.getName()) .setMethodName(handler.getMethodNames()[handleIndex])
//// .setListener(listener) .setListener(listener)
// .setPublishedObject(message)); .setPublishedObject(message));
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
// errorHandler.handlePublicationError(new PublicationError() errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " + .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception") "The handler code threw an exception")
// .setCause(e) .setCause(e)
// .setMethodName(handler.getName()) .setMethodName(handler.getMethodNames()[handleIndex])
//// .setListener(listener) .setListener(listener)
// .setPublishedObject(message)); .setPublishedObject(message));
} }
} }
} }
@ -292,10 +293,7 @@ public class Subscription {
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; return this.ID;
int result = 1;
result = prime * result + (this.handlerMetadata == null ? 0 : this.handlerMetadata.hashCode());
return result;
} }
@Override @Override
@ -310,13 +308,6 @@ public class Subscription {
return false; return false;
} }
Subscription other = (Subscription) obj; Subscription other = (Subscription) obj;
if (this.handlerMetadata == null) { return this.ID == other.ID;
if (other.handlerMetadata != null) {
return false;
}
} else if (!this.handlerMetadata.equals(other.handlerMetadata)) {
return false;
}
return true;
} }
} }

View File

@ -1,10 +1,5 @@
package net.engio.mbassy.multi.subscription; package net.engio.mbassy.multi.subscription;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -34,7 +29,7 @@ public class SubscriptionManager {
private float LOAD_FACTOR; private float LOAD_FACTOR;
// the metadata reader that is used to inspect objects passed to the subscribe method // the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader = new MetadataReader(); private static final MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
@ -50,12 +45,12 @@ public class SubscriptionManager {
private final Object holder = new Object[0]; private final Object holder = new Object[0];
private final Map<Class<?>, FastEntrySet<Class<?>>> superClassesCache; private final Map<Class<?>, Set<Class<?>>> superClassesCache;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private Map<Class<?>, FastEntrySet<Subscription>> superClassSubscriptions; private Map<Class<?>, Set<Subscription>> superClassSubscriptions;
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>(); // private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
@ -73,10 +68,10 @@ public class SubscriptionManager {
// only used during SUB/UNSUB // only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.superClassesCache = new ConcurrentHashMap<Class<?>, FastEntrySet<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.superClassesCache = new ConcurrentHashMap<Class<?>, Set<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance on handlers // it's a hit on SUB/UNSUB, but improves performance on handlers
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, FastEntrySet<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Set<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
} }
@ -172,7 +167,7 @@ public class SubscriptionManager {
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass); Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions == null) { if (subscriptions == null) {
// a listener is subscribed for the first time // a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size(); int handlersSize = messageHandlers.size();
if (handlersSize == 0) { if (handlersSize == 0) {
@ -347,65 +342,61 @@ public class SubscriptionManager {
// must be protected by read lock // must be protected by read lock
// CAN RETURN NULL - not thread safe. // CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType); return this.subscriptionsPerMessageSingle.get(messageType);
} }
// must be protected by read lock // must be protected by read lock
// CAN RETURN NULL - not thread safe. // CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) { public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2); return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2);
} }
// must be protected by read lock // must be protected by read lock
// CAN RETURN NULL - not thread safe. // CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) { public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
} }
// must be protected by read lock // must be protected by read lock
// CAN RETURN NULL - not thread safe. // CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) { public final Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
return this.subscriptionsPerMessageMulti.getValue(messageTypes); return this.subscriptionsPerMessageMulti.getValue(messageTypes);
} }
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public FastEntrySet<Subscription> getSuperSubscriptions(Class<?> superType) { public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, FastEntrySet<Subscription>> superClassSubs = this.superClassSubscriptions; // whenever our subscriptions change, this map is cleared.
if (superClassSubs == null) { Set<Subscription> subsPerType = this.superClassSubscriptions.get(superType);
// we haven't created it yet (via subscribe)
return null;
}
FastEntrySet<Subscription> subsPerType = superClassSubs.get(superType);
if (subsPerType == null) { if (subsPerType == null) {
FastEntrySet<Class<?>> types = this.superClassesCache.get(superType); // this caches our class hierarchy. This is never cleared.
Set<Class<?>> types = this.superClassesCache.get(superType);
if (types == null || types.isEmpty()) { if (types == null || types.isEmpty()) {
return null; return null;
} }
Reference2BooleanArrayMap<Subscription> map = new Reference2BooleanArrayMap<Subscription>(types.size() + 1); subsPerType = new StrongConcurrentSet<Subscription>(types.size() + 1, this.LOAD_FACTOR);
ObjectIterator<Entry<Class<?>>> fastIterator = types.fastIterator(); Iterator<Class<?>> iterator = types.iterator();
while (fastIterator.hasNext()) { while (iterator.hasNext()) {
Class<?> superClass = fastIterator.next().getKey(); Class<?> superClass = iterator.next();
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
if (subs != null && !subs.isEmpty()) { if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) { if (sub.acceptsSubtypes()) {
map.put(sub, Boolean.TRUE); subsPerType.add(sub);
} }
} }
} }
} }
subsPerType = map.reference2BooleanEntrySet(); // cache our subscriptions for super classes, so that their access can be fast!
superClassSubs.put(superType, subsPerType); this.superClassSubscriptions.put(superType, subsPerType);
} }
return subsPerType; return subsPerType;
@ -539,15 +530,13 @@ public class SubscriptionManager {
if (!this.superClassesCache.containsKey(clazz)) { if (!this.superClassesCache.containsKey(clazz)) {
// it doesn't matter if concurrent access stomps on values, since they are always the same. // it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
Reference2BooleanArrayMap<Class<?>> map = new Reference2BooleanArrayMap<Class<?>>(superTypes.size() + 1); StrongConcurrentSet<Class<?>> set = new StrongConcurrentSet<Class<?>>(superTypes.size() + 1, this.LOAD_FACTOR);
for (Class<?> c : superTypes) { for (Class<?> c : superTypes) {
map.put(c, Boolean.TRUE); set.add(c);
} }
FastEntrySet<Class<?>> fastSet = map.reference2BooleanEntrySet();
// race conditions will result in duplicate answers, which we don't care about // race conditions will result in duplicate answers, which we don't care about
this.superClassesCache.put(clazz, fastSet); this.superClassesCache.put(clazz, set);
} }
} }