Good performance

This commit is contained in:
nathan 2015-05-31 22:15:55 +02:00
parent bbade8aa72
commit ab32656ce1
9 changed files with 94 additions and 148 deletions

View File

@ -1,5 +1,9 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@ -8,16 +12,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.SubscriptionUtils;
import dorkbox.util.messagebus.common.VarArgPossibility;
import dorkbox.util.messagebus.common.VarArgUtils;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
@ -40,10 +34,7 @@ public class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
// the metadata reader that is used to inspect objects passed to the subscribe method
private static final MetadataReader metadataReader = new MetadataReader();
final SubscriptionUtils utils;
private final SubscriptionUtils utils;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
@ -66,21 +57,16 @@ public class SubscriptionManager {
private final VarArgUtils varArgUtils;
// stripe size of maps for concurrency
private final int STRIPE_SIZE;
// private final StampedLock lock = new StampedLock();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads;
float loadFactor = SubscriptionManager.LOAD_FACTOR;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, this.STRIPE_SIZE);
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, numberOfThreads);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(64, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
@ -93,7 +79,7 @@ public class SubscriptionManager {
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE);
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads);
}
public void shutdown() {
@ -150,13 +136,13 @@ public class SubscriptionManager {
return;
}
ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>();
Collection<Subscription> subsForPublication = null;
VarArgPossibility varArgPossibility = this.varArgPossibility;
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>(handlersSize);
Collection<Subscription> subsForPublication = null;
// create the subscription
MessageHandler messageHandler;
@ -168,10 +154,10 @@ public class SubscriptionManager {
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
subsPerListener.add(subscription); // activates this sub for sub/unsub
// now add this subscription to each of the handled types
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
subsPerListener.add(subscription); // activates this sub for sub/unsub
subsForPublication.add(subscription); // activates this sub for publication
}
@ -195,7 +181,8 @@ public class SubscriptionManager {
}
// inside a write lock
private final Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
// also puts it into the correct map if it's not already there
private Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final VarArgPossibility varArgPossibility) {
@ -211,7 +198,7 @@ public class SubscriptionManager {
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>(8);
subs = new ArrayList<Subscription>();
boolean isArray = utils.isArray(type0);
if (isArray) {
@ -219,6 +206,8 @@ public class SubscriptionManager {
}
// cache the super classes
// todo: makes it's own read/write lock. it's 2x as expensive when running inside the writelock for subscribe, VS on it's own
// maybe even use StampedLock
utils.cacheSuperClasses(type0, isArray);
subsPerMessageSingle.put(type0, subs);

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus.listener;
package dorkbox.util.messagebus.common;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -8,7 +8,6 @@ import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.ReflectionUtils;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains

View File

@ -70,7 +70,7 @@ public class ReflectionUtils {
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static ArrayList<Class<?>> getSuperTypes(Class<?> from) {
public static Class<?>[] getSuperTypes(Class<?> from) {
ArrayList<Class<?>> superclasses = new ArrayList<Class<?>>();
collectInterfaces( from, superclasses );
@ -81,7 +81,7 @@ public class ReflectionUtils {
collectInterfaces( from, superclasses );
}
return superclasses;
return superclasses.toArray(new Class<?>[superclasses.size()]);
}
public static void collectInterfaces( Class<?> from, Collection<Class<?>> accumulator ) {

View File

@ -3,8 +3,9 @@ package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
@ -48,7 +49,7 @@ public class SubscriptionUtils {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 1);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 8);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
@ -64,39 +65,51 @@ public class SubscriptionUtils {
this.superClassSubscriptions.clear();
}
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*
* if parameter clazz is of type array, then the super classes are of array type as well
*
* protected by read lock by caller
* protected by read lock by caller. The cache version is called first, by write lock
*/
public final Class<?>[] getSuperClasses_NL(final Class<?> clazz, final boolean isArray) {
// this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
if (classes == null) {
// get all super types of class
final Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(superTypes.size());
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
int length = superTypes.length;
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(length);
Iterator<Class<?>> iterator;
Class<?> c;
if (isArray) {
for (int i=0;i<length;i++) {
c = superTypes[i];
for (iterator = superTypes.iterator(); iterator.hasNext();) {
c = iterator.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
newList.add(c);
if (c != clazz) {
newList.add(c);
}
}
} else {
for (int i=0;i<length;i++) {
c = superTypes[i];
if (c != clazz) {
newList.add(c);
}
}
}
classes = newList.toArray(new Class<?>[newList.size()]);
local.put(clazz, classes);
}
@ -199,17 +212,17 @@ public class SubscriptionUtils {
* Cache the values of JNI method, isArray(c)
* @return true if the class c is an array type
*/
@SuppressWarnings("boxing")
// @SuppressWarnings("boxing")
public final boolean isArray(final Class<?> c) {
final Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
final Boolean isArray = isArrayCache.get(c);
if (isArray == null) {
// final Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
//
// final Boolean isArray = isArrayCache.get(c);
// if (isArray == null) {
boolean b = c.isArray();
isArrayCache.put(c, b);
// isArrayCache.put(c, b);
return b;
}
return isArray;
// }
// return isArray;
}

View File

@ -1,57 +0,0 @@
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.ArrayList;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ReflectionUtils;
/**
* The meta data reader is responsible for parsing and validating message handler configurations.
*
* @author bennidi
* Date: 11/16/12
* @author dorkbox
* Date: 2/2/15
*/
public class MetadataReader {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public MessageHandler[] getMessageHandlers(final Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
Method method;
for (int i=0;i<length;i++) {
method = allMethods[i];
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
if (!ReflectionUtils.containsOverridingMethod(allMethods, method)) {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
final Handler handler = ReflectionUtils.getAnnotation(method, Handler.class);
if (handler == null || !handler.enabled()) {
// disabled or invalid listeners are ignored
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
if (overriddenHandler == null) {
overriddenHandler = method;
}
// if a handler is overwritten it inherits the configuration of its parent method
finalMethods.add(new MessageHandler(overriddenHandler, handler));
}
}
MessageHandler[] array = finalMethods.toArray(new MessageHandler[finalMethods.size()]);
return array;
}
}

View File

@ -8,12 +8,12 @@ import org.omg.CORBA.BooleanHolder;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.listener.MessageHandler;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered

View File

@ -11,9 +11,7 @@ import org.junit.Test;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MessageListener;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.common.MessageHandler;
/**
*
@ -22,16 +20,14 @@ import dorkbox.util.messagebus.listener.MetadataReader;
*/
public class MetadataReaderTest extends AssertSupport {
private MetadataReader reader = new MetadataReader();
@Test
public void testListenerWithoutInheritance() {
MessageListener listener = this.reader.getMessageListener(MessageListener1.class, 0.85F, 4);
MessageHandler[] allHandlers = MessageHandler.get(MessageListener1.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
.expectHandlers(1, BufferedReader.class);
validator.check(listener);
validator.check(allHandlers);
}
/*
@ -45,23 +41,22 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithInheritance() {
MessageListener listener = this.reader.getMessageListener(MessageListener2.class, 0.85F, 4);
MessageHandler[] allHandlers = MessageHandler.get(MessageListener2.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
.expectHandlers(1, BufferedReader.class);
validator.check(listener);
validator.check(allHandlers);
}
@Test
public void testListenerWithInheritanceOverriding() {
MessageListener listener = this.reader.getMessageListener(MessageListener3.class, 0.85F, 4);
MessageHandler[] allHandlers = MessageHandler.get(MessageListener3.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(0, String.class)
.expectHandlers(2, Object.class)
.expectHandlers(0, BufferedReader.class);
validator.check(listener);
validator.check(allHandlers);
}
public static class NClasses {
@ -119,10 +114,10 @@ public class MetadataReaderTest extends AssertSupport {
return this;
}
public void check(MessageListener listener){
public void check(MessageHandler[] allHandlers){
for (Map.Entry<NClasses, Integer> expectedHandler: this.handlers.entrySet()) {
NClasses key = expectedHandler.getKey();
List<MessageHandler> handlers2 = getHandlers(listener, key.messageTypes);
List<MessageHandler> handlers2 = pruneHandlers(allHandlers, key.messageTypes);
if (expectedHandler.getValue() > 0){
assertTrue(!handlers2.isEmpty());
@ -135,9 +130,10 @@ public class MetadataReaderTest extends AssertSupport {
}
// for testing
public List<MessageHandler> getHandlers(MessageListener listener, Class<?>... messageTypes) {
public List<MessageHandler> pruneHandlers(MessageHandler[] allHandlers, Class<?>... messageTypes) {
List<MessageHandler> matching = new LinkedList<MessageHandler>();
for (MessageHandler handler : listener.getHandlers()) {
for (MessageHandler handler : allHandlers) {
if (handlesMessage(handler, messageTypes)) {
matching.add(handler);
}
@ -198,7 +194,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testMultipleSignatureListenerWithoutInheritance() {
MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class, 0.85F, 4);
MessageHandler[] allHandlers = MessageHandler.get(MultiMessageListener1.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(7, String.class)
.expectHandlers(9, String.class, String.class)
@ -211,7 +207,7 @@ public class MetadataReaderTest extends AssertSupport {
.expectHandlers(2, String.class, Object.class)
.expectHandlers(2, String.class, Object[].class)
;
validator.check(listener);
validator.check(allHandlers);
}
@SuppressWarnings("unused")

View File

@ -13,13 +13,11 @@ import java.util.concurrent.LinkedTransferQueue;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MessageListener;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription;
@ -28,7 +26,9 @@ public class PerfTest_Collections {
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final float LOAD_FACTOR = 0.8F;
private static final MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class, LOAD_FACTOR, 8);
private static final MessageHandler[] allHandlers = MessageHandler.get(Listener.class);
public static void main(final String[] args) throws Exception {
final int size = 16;
@ -73,10 +73,8 @@ public class PerfTest_Collections {
final int warmupRuns = 2;
final int runs = 3;
Collection<MessageHandler> handlers = messageListener.getHandlers();
for (int i=0;i<size;i++) {
for (MessageHandler x : handlers) {
for (MessageHandler x : allHandlers) {
set.add(new Subscription(x));
}
}

View File

@ -1,10 +1,16 @@
package dorkbox.util.messagebus.common;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.*;
/**
*
* @author bennidi
@ -51,7 +57,7 @@ public class SubscriptionValidator extends AssertSupport{
Subscription matchingSub = null;
// one of the subscriptions must belong to the subscriber type
for(Subscription sub : collection){
if(sub.belongsTo(validationValidationEntry.subscriber)){
if(belongsTo(sub, validationValidationEntry.subscriber)){
matchingSub = sub;
break;
}
@ -67,21 +73,23 @@ public class SubscriptionValidator extends AssertSupport{
* Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
public boolean belongsTo(Subscription subscription, Class<?> listener) {
// only in unit test
public boolean isFromListener(Class<?> listener){
return this.listenerConfig.isFromListener(listener);
}
// return this.handlerMetadata.isFromListener(listener);
//
//
// // only in unit test
// public boolean isFromListener(Class<?> listener){
// return this.listenerConfig.isFromListener(listener);
// }
return false;
}
// only in unit test
public boolean isFromListener(Class<?> listener) {
return this.listenerDefinition.equals(listener);
}
// public boolean isFromListener(Class<?> listener) {
// return this.listenerDefinition.equals(listener);
// }
private Collection<ValidationEntry> getEntries(Class<?> messageType){