From 2be96d7ff6b0085cda94bce29b5c53e5d43ec15e Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 20 Feb 2015 15:43:39 +0100 Subject: [PATCH] Now have 300ms publication for both 2/4million handled events --- .../engio/mbassy/multi/MultiMBassador.java | 36 +- .../mbassy/multi/common/ReflectionUtils.java | 2 +- .../multi/dispatch/IHandlerInvocation.java | 10 +- .../dispatch/ReflectiveHandlerInvocation.java | 18 +- .../SynchronizedHandlerInvocation.java | 18 +- .../mbassy/multi/listener/MessageHandler.java | 18 +- .../mbassy/multi/listener/MetadataReader.java | 1 - .../multi/subscription/Subscription.java | 368 +++++++++--------- .../subscription/SubscriptionManager.java | 258 ++++++------ .../engio/mbassy/multi/AsyncFIFOBusTest.java | 2 + .../mbassy/multi/BenchmarkReflection.java | 133 +++++++ .../net/engio/mbassy/multi/MBassadorTest.java | 1 + .../engio/mbassy/multi/MultiMessageTest.java | 2 + .../engio/mbassy/multi/PerformanceTest.java | 3 +- .../net/engio/mbassy/multi/SyncBusTest.java | 2 + 15 files changed, 514 insertions(+), 358 deletions(-) create mode 100644 src/test/java/net/engio/mbassy/multi/BenchmarkReflection.java diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index b87160a..161a254 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -40,7 +40,7 @@ public class MultiMBassador implements IMessageBus { private List threads; public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()*2); + this(Runtime.getRuntime().availableProcessors()); // this(2); } @@ -134,6 +134,7 @@ public class MultiMBassador implements IMessageBus { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); +// manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); // Run subscriptions @@ -166,6 +167,7 @@ public class MultiMBassador implements IMessageBus { sub.publishToSubscription(this, message); } } +// manager.readUnLock(); } @SuppressWarnings("null") @@ -184,7 +186,7 @@ public class MultiMBassador implements IMessageBus { deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); +// Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // Run subscriptions @@ -205,13 +207,13 @@ public class MultiMBassador implements IMessageBus { } - // now get superClasses - if (superSubscriptions != null) { - for (Subscription sub : superSubscriptions) { - // this catches all exception types - sub.publishToSubscription(this, message1, message2); - } - } +// // now get superClasses +// if (superSubscriptions != null) { +// for (Subscription sub : superSubscriptions) { +// // this catches all exception types +// sub.publishToSubscription(this, message1, message2); +// } +// } } @SuppressWarnings("null") @@ -231,7 +233,7 @@ public class MultiMBassador implements IMessageBus { deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); +// Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // Run subscriptions @@ -252,13 +254,13 @@ public class MultiMBassador implements IMessageBus { } - // now get superClasses - if (superSubscriptions != null) { - for (Subscription sub : superSubscriptions) { - // this catches all exception types - sub.publishToSubscription(this, message1, message2, message3); - } - } +// // now get superClasses +// if (superSubscriptions != null) { +// for (Subscription sub : superSubscriptions) { +// // this catches all exception types +// sub.publishToSubscription(this, message1, message2, message3); +// } +// } } diff --git a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java index 5f945e5..da9e88f 100644 --- a/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/multi/common/ReflectionUtils.java @@ -128,7 +128,7 @@ public class ReflectionUtils } public static A getAnnotation( AnnotatedElement from, Class annotationType){ - return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap(8, .8f))); + return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap(8, 0.8F))); } // diff --git a/src/main/java/net/engio/mbassy/multi/dispatch/IHandlerInvocation.java b/src/main/java/net/engio/mbassy/multi/dispatch/IHandlerInvocation.java index f3a6a3f..02e0473 100644 --- a/src/main/java/net/engio/mbassy/multi/dispatch/IHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/multi/dispatch/IHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.multi.dispatch; -import java.lang.reflect.Method; +import com.esotericsoftware.reflectasm.MethodAccess; /** * A handler invocation encapsulates the logic that is used to invoke a single @@ -28,7 +28,7 @@ public interface IHandlerInvocation { * type that the handler consumes * @param handler The handler (method) that will be called via reflection */ - void invoke(Object listener, Method handler, Object message) throws Throwable; + void invoke(Object listener, MethodAccess handler, int methodIndex, Object message) throws Throwable; /** * Invoke the message delivery logic of this handler @@ -39,7 +39,7 @@ public interface IHandlerInvocation { * type that the handler consumes * @param handler The handler (method) that will be called via reflection */ - void invoke(Object listener, Method handler, Object message1, Object message2) throws Throwable; + void invoke(Object listener, MethodAccess handler, int methodIndex, Object message1, Object message2) throws Throwable; /** * Invoke the message delivery logic of this handler @@ -50,7 +50,7 @@ public interface IHandlerInvocation { * type that the handler consumes * @param handler The handler (method) that will be called via reflection */ - void invoke(Object listener, Method handler, Object message1, Object message2, Object message3) throws Throwable; + void invoke(Object listener, MethodAccess handler, int methodIndex, Object message1, Object message2, Object message3) throws Throwable; /** * Invoke the message delivery logic of this handler @@ -61,5 +61,5 @@ public interface IHandlerInvocation { * type that the handler consumes * @param handler The handler (method) that will be called via reflection */ - void invoke(Object listener, Method handler, Object... message) throws Throwable; + void invoke(Object listener, MethodAccess handler, int methodIndex, Object... message) throws Throwable; } diff --git a/src/main/java/net/engio/mbassy/multi/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/multi/dispatch/ReflectiveHandlerInvocation.java index f0f5887..237d376 100644 --- a/src/main/java/net/engio/mbassy/multi/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/multi/dispatch/ReflectiveHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.multi.dispatch; -import java.lang.reflect.Method; +import com.esotericsoftware.reflectasm.MethodAccess; /** * Uses reflection to invoke a message handler for a given message. @@ -17,22 +17,22 @@ public class ReflectiveHandlerInvocation implements IHandlerInvocation { } @Override - public void invoke(final Object listener, final Method handler, final Object message) throws Throwable { - handler.invoke(listener, message); + public void invoke(final Object listener, final MethodAccess handler, final int methodIndex, final Object message) throws Throwable { + handler.invoke(listener, methodIndex, message); } @Override - public void invoke(final Object listener, final Method handler, final Object message1, final Object message2) throws Throwable { - handler.invoke(listener, message1, message2); + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2) throws Throwable { + handler.invoke(listener, methodIndex, message1, message2); } @Override - public void invoke(final Object listener, final Method handler, final Object message1, final Object message2, final Object message3) throws Throwable { - handler.invoke(listener, message1, message2, message3); + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2, final Object message3) throws Throwable { + handler.invoke(listener, methodIndex, message1, message2, message3); } @Override - public void invoke(final Object listener, final Method handler, final Object... messages) throws Throwable { - handler.invoke(listener, messages); + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object... messages) throws Throwable { + handler.invoke(listener, methodIndex, messages); } } diff --git a/src/main/java/net/engio/mbassy/multi/dispatch/SynchronizedHandlerInvocation.java b/src/main/java/net/engio/mbassy/multi/dispatch/SynchronizedHandlerInvocation.java index 1917807..67afa2e 100644 --- a/src/main/java/net/engio/mbassy/multi/dispatch/SynchronizedHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/multi/dispatch/SynchronizedHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.multi.dispatch; -import java.lang.reflect.Method; +import com.esotericsoftware.reflectasm.MethodAccess; /** * Synchronizes message handler invocations for all handlers that specify @Synchronized @@ -19,30 +19,30 @@ public class SynchronizedHandlerInvocation implements IHandlerInvocation { } @Override - public void invoke(final Object listener, final Method handler, final Object message) throws Throwable { + public void invoke(final Object listener, final MethodAccess handler, final int methodIndex, final Object message) throws Throwable { synchronized (listener) { - this.delegate.invoke(listener, handler, message); + this.delegate.invoke(listener, handler, methodIndex, message); } } @Override - public void invoke(final Object listener, final Method handler, final Object message1, final Object message2) throws Throwable { + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2) throws Throwable { synchronized (listener) { - this.delegate.invoke(listener, handler, message1, message2); + this.delegate.invoke(listener, handler, methodIndex, message1, message2); } } @Override - public void invoke(final Object listener, final Method handler, final Object message1, final Object message2, final Object message3) throws Throwable { + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2, final Object message3) throws Throwable { synchronized (listener) { - this.delegate.invoke(listener, handler, message1, message2, message3); + this.delegate.invoke(listener, handler, methodIndex, message1, message2, message3); } } @Override - public void invoke(final Object listener, final Method handler, final Object... messages) throws Throwable { + public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object... messages) throws Throwable { synchronized (listener) { - this.delegate.invoke(listener, handler, messages); + this.delegate.invoke(listener, handler, methodIndex, messages); } } } diff --git a/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java index 2358fe5..8412fc8 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java @@ -7,6 +7,8 @@ import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Synchronized; import net.engio.mbassy.multi.common.ReflectionUtils; +import com.esotericsoftware.reflectasm.MethodAccess; + /** * Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains * the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy @@ -28,14 +30,16 @@ import net.engio.mbassy.multi.common.ReflectionUtils; */ public class MessageHandler { - private final Method handler; + private final MethodAccess handler; + private final int methodIndex; + private final Class[] handledMessages; private final boolean acceptsSubtypes; private final MessageListener listenerConfig; private final boolean isSynchronized; - public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){ + public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata) { super(); if (handler == null) { @@ -44,8 +48,8 @@ public class MessageHandler { Class[] handledMessages = handler.getParameterTypes(); - this.handler = handler; - handler.setAccessible(true); + this.handler = MethodAccess.get(handler.getDeclaringClass()); + this.methodIndex = this.handler.getIndex(handler.getName(), handledMessages); this.acceptsSubtypes = !handlerConfig.rejectSubtypes(); this.listenerConfig = listenerMetadata; @@ -61,10 +65,14 @@ public class MessageHandler { return this.listenerConfig.isFromListener(listener); } - public Method getHandler() { + public MethodAccess getHandler() { return this.handler; } + public int getMethodIndex() { + return this.methodIndex; + } + public Class[] getHandledMessages() { return this.handledMessages; } diff --git a/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java index b2ced29..c42977f 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MetadataReader.java @@ -48,7 +48,6 @@ public class MetadataReader { // if a handler is overwritten it inherits the configuration of its parent method MessageHandler handlerMetadata = new MessageHandler(overriddenHandler, handlerConfig, listenerMetadata); listenerMetadata.addHandler(handlerMetadata); - } return listenerMetadata; } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index e8997a1..e1cb46a 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -1,8 +1,6 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; import java.util.Collection; import net.engio.mbassy.multi.common.StrongConcurrentSet; @@ -10,9 +8,10 @@ import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.multi.error.ErrorHandlingSupport; -import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.listener.MessageHandler; +import com.esotericsoftware.reflectasm.MethodAccess; + /** * A subscription is a thread-safe container that manages exactly one message handler of all registered * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class @@ -88,208 +87,207 @@ public class Subscription { Collection listeners = this.listeners; if (!listeners.isEmpty()) { - Method handler = this.handlerMetadata.getHandler(); + MethodAccess handler = this.handlerMetadata.getHandler(); + int handleIndex = this.handlerMetadata.getMethodIndex(); IHandlerInvocation invocation = this.invocation; try { for (Object listener : listeners) { - invocation.invoke(listener, handler, message); + invocation.invoke(listener, handler, handleIndex, message); } } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) -// .setListener(listener) - .setPublishedObject(message)); + e.printStackTrace(); +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +//// .setListener(listener) +// .setPublishedObject(message)); } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0]) - .setCause(e) - .setMethodName(handler.getName()) -// .setListener(listener) - .setPublishedObject(message)); + e.printStackTrace(); +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + message.getClass() +// + "Expected: " + handler.getParameterTypes()[0]) +// .setCause(e) +// .setMethodName(handler.getName()) +//// .setListener(listener) +// .setPublishedObject(message)); } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) -// .setListener(listener) - .setPublishedObject(message)); + e.printStackTrace(); +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +//// .setListener(listener) +// .setPublishedObject(message)); } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) -// .setListener(listener) - .setPublishedObject(message)); + e.printStackTrace(); +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +//// .setListener(listener) +// .setPublishedObject(message)); } } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { -// Collection listeners = this.listeners.keySet(); - Collection listeners = this.listeners; -// IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, message1, message2); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] - ) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } - } - } +// Collection listeners = this.listeners; +// +// if (listeners.size() > 0) { +// MethodHandle handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : listeners) { +// try { +// this.invocation.invoke(listener, handler, message1, message2); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] +// ) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } +// } +// } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { -// Collection listeners = this.listeners.keySet(); - Collection listeners = this.listeners; -// IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, message1, message2, message3); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() + ", " + - message3.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] + ", " + - handler.getParameterTypes()[2] - ) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } - } - } +// Collection listeners = this.listeners; +// +// if (listeners.size() > 0) { +// Method handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : listeners) { +// try { +// this.invocation.invoke(listener, handler, message1, message2, message3); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() + ", " + +// message3.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] + ", " + +// handler.getParameterTypes()[2] +// ) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } +// } +// } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { -// Collection listeners = this.listeners.keySet(); - Collection listeners = this.listeners; -// IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, messages); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) - + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } - } - } +// Collection listeners = this.listeners; +// +// if (listeners.size() > 0) { +// Method handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : listeners) { +// try { +// this.invocation.invoke(listener, handler, messages); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) +// + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// return; +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } +// } +// } } @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 05f65b1..7b0d2b3 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -1,9 +1,10 @@ package net.engio.mbassy.multi.subscription; +import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap; +import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry; import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; -import java.util.ArrayDeque; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -53,7 +54,7 @@ public class SubscriptionManager { private final Object holder = new Object[0]; - private final Map, Collection>> superClassesCache; + private final Map, FastEntrySet>> superClassesCache; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers @@ -65,9 +66,10 @@ public class SubscriptionManager { // remember already processed classes that do not contain any message handlers private final Map, Object> nonListeners; - // synchronize read/write acces to the subscription maps + // synchronize read/write access to the subscription maps private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); + public SubscriptionManager(int numberOfThreads) { this.MAP_STRIPING = numberOfThreads; this.LOAD_FACTOR = 0.8f; @@ -78,7 +80,7 @@ public class SubscriptionManager { // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, 1); - this.superClassesCache = new ConcurrentHashMap, Collection>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassesCache = new ConcurrentHashMap, FastEntrySet>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.nonListeners = new ConcurrentHashMap, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); } @@ -354,14 +356,17 @@ public class SubscriptionManager { FastEntrySet subsPerType = superClassSubs.get(superType); if (subsPerType == null) { - Collection> types = this.superClassesCache.get(superType); + FastEntrySet> types = this.superClassesCache.get(superType); if (types == null || types.isEmpty()) { return null; } Reference2BooleanArrayMap map = new Reference2BooleanArrayMap(types.size() + 1); - for (Class superClass : types) { + ObjectIterator>> fastIterator = types.fastIterator(); + while (fastIterator.hasNext()) { + Class superClass = fastIterator.next().getKey(); + Collection subs = this.subscriptionsPerMessageSingle.get(superClass); if (subs != null && !subs.isEmpty()) { for (Subscription sub : subs) { @@ -381,143 +386,146 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. - public Collection getSuperSubscriptions(Class superType1, Class superType2) { + public void getSuperSubscriptions(Class superType1, Class superType2) { // Collection subsPerType2 = this.superClassSubscriptions.get(); +// +// +// // not thread safe. DO NOT MODIFY +// Collection> types1 = this.superClassesCache.get(superType1); +// Collection> types2 = this.superClassesCache.get(superType2); +// +// Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); +// +// Collection subs; +// IdentityObjectTree, Collection> leaf1; +// IdentityObjectTree, Collection> leaf2; +// +// Iterator> iterator1 = new SuperClassIterator(superType1, types1); +// Iterator> iterator2; +// +// Class eventSuperType1; +// Class eventSuperType2; +// +// while (iterator1.hasNext()) { +// eventSuperType1 = iterator1.next(); +// boolean type1Matches = eventSuperType1 == superType1; +// +// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); +// if (leaf1 != null) { +// iterator2 = new SuperClassIterator(superType2, types2); +// +// while (iterator2.hasNext()) { +// eventSuperType2 = iterator2.next(); +// if (type1Matches && eventSuperType2 == superType2) { +// continue; +// } +// +// leaf2 = leaf1.getLeaf(eventSuperType2); +// +// if (leaf2 != null) { +// subs = leaf2.getValue(); +// if (subs != null) { +// for (Subscription sub : subs) { +// if (sub.acceptsSubtypes()) { +// subsPerType.add(sub); +// } +// } +// } +// } +// } +// } +// } - - // not thread safe. DO NOT MODIFY - Collection> types1 = this.superClassesCache.get(superType1); - Collection> types2 = this.superClassesCache.get(superType2); - - Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); - - Collection subs; - IdentityObjectTree, Collection> leaf1; - IdentityObjectTree, Collection> leaf2; - - Iterator> iterator1 = new SuperClassIterator(superType1, types1); - Iterator> iterator2; - - Class eventSuperType1; - Class eventSuperType2; - - while (iterator1.hasNext()) { - eventSuperType1 = iterator1.next(); - boolean type1Matches = eventSuperType1 == superType1; - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null) { - iterator2 = new SuperClassIterator(superType2, types2); - - while (iterator2.hasNext()) { - eventSuperType2 = iterator2.next(); - if (type1Matches && eventSuperType2 == superType2) { - continue; - } - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - subs = leaf2.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - - return subsPerType; +// return subsPerType; } // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. - public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { - // not thread safe. DO NOT MODIFY - Collection> types1 = this.superClassesCache.get(superType1); - Collection> types2 = this.superClassesCache.get(superType2); - Collection> types3 = this.superClassesCache.get(superType3); - - Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); - - Collection subs; - IdentityObjectTree, Collection> leaf1; - IdentityObjectTree, Collection> leaf2; - IdentityObjectTree, Collection> leaf3; - - Iterator> iterator1 = new SuperClassIterator(superType1, types1); - Iterator> iterator2; - Iterator> iterator3; - - Class eventSuperType1; - Class eventSuperType2; - Class eventSuperType3; - - while (iterator1.hasNext()) { - eventSuperType1 = iterator1.next(); - boolean type1Matches = eventSuperType1 == superType1; - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null) { - iterator2 = new SuperClassIterator(superType2, types2); - - while (iterator2.hasNext()) { - eventSuperType2 = iterator2.next(); - boolean type12Matches = type1Matches && eventSuperType2 == superType2; - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - iterator3 = new SuperClassIterator(superType3, types3); - - while (iterator3.hasNext()) { - eventSuperType3 = iterator3.next(); - if (type12Matches && eventSuperType3 == superType3) { - continue; - } - - leaf3 = leaf2.getLeaf(eventSuperType3); - - subs = leaf3.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - } - - return subsPerType; + public void getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { +// // not thread safe. DO NOT MODIFY +// Collection> types1 = this.superClassesCache.get(superType1); +// Collection> types2 = this.superClassesCache.get(superType2); +// Collection> types3 = this.superClassesCache.get(superType3); +// +// Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); +// +// Collection subs; +// IdentityObjectTree, Collection> leaf1; +// IdentityObjectTree, Collection> leaf2; +// IdentityObjectTree, Collection> leaf3; +// +// Iterator> iterator1 = new SuperClassIterator(superType1, types1); +// Iterator> iterator2; +// Iterator> iterator3; +// +// Class eventSuperType1; +// Class eventSuperType2; +// Class eventSuperType3; +// +// while (iterator1.hasNext()) { +// eventSuperType1 = iterator1.next(); +// boolean type1Matches = eventSuperType1 == superType1; +// +// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); +// if (leaf1 != null) { +// iterator2 = new SuperClassIterator(superType2, types2); +// +// while (iterator2.hasNext()) { +// eventSuperType2 = iterator2.next(); +// boolean type12Matches = type1Matches && eventSuperType2 == superType2; +// +// leaf2 = leaf1.getLeaf(eventSuperType2); +// +// if (leaf2 != null) { +// iterator3 = new SuperClassIterator(superType3, types3); +// +// while (iterator3.hasNext()) { +// eventSuperType3 = iterator3.next(); +// if (type12Matches && eventSuperType3 == superType3) { +// continue; +// } +// +// leaf3 = leaf2.getLeaf(eventSuperType3); +// +// subs = leaf3.getValue(); +// if (subs != null) { +// for (Subscription sub : subs) { +// if (sub.acceptsSubtypes()) { +// subsPerType.add(sub); +// } +// } +// } +// } +// } +// } +// } +// } +// +// return subsPerType; } /** * race conditions will result in duplicate answers, which we don't care if happens */ - private Collection> setupSuperClassCache(Class clazz) { - Collection> types = this.superClassesCache.get(clazz); - - if (types == null) { + private void setupSuperClassCache(Class clazz) { + if (!this.superClassesCache.containsKey(clazz)) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); // types = new ArrayDeque>(superTypes); - types = new StrongConcurrentSet>(superTypes.size(), this.LOAD_FACTOR); - types.addAll(superTypes); +// types = new StrongConcurrentSet>(superTypes.size(), this.LOAD_FACTOR); +// types.addAll(superTypes); + + Reference2BooleanArrayMap> map = new Reference2BooleanArrayMap>(superTypes.size() + 1); + for (Class c : superTypes) { + map.put(c, Boolean.TRUE); + } + + FastEntrySet> fastSet = map.reference2BooleanEntrySet(); // race conditions will result in duplicate answers, which we don't care about - this.superClassesCache.put(clazz, types); + this.superClassesCache.put(clazz, fastSet); } - - return types; } public static class SuperClassIterator implements Iterator> { diff --git a/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java index aac72b5..9138c5e 100644 --- a/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java @@ -3,6 +3,8 @@ package net.engio.mbassy.multi; import java.util.LinkedList; import java.util.List; +import net.engio.mbassy.multi.IMessageBus; +import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/multi/BenchmarkReflection.java b/src/test/java/net/engio/mbassy/multi/BenchmarkReflection.java new file mode 100644 index 0000000..6155378 --- /dev/null +++ b/src/test/java/net/engio/mbassy/multi/BenchmarkReflection.java @@ -0,0 +1,133 @@ +package net.engio.mbassy.multi; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.math.BigDecimal; + +import com.esotericsoftware.reflectasm.MethodAccess; + +// from: https://stackoverflow.com/questions/14146570/calling-a-getter-in-java-though-reflection-whats-the-fastest-way-to-repeatedly/14146919#14146919 +// modified by dorkbox +public abstract class BenchmarkReflection { + + final String name; + + public BenchmarkReflection(String name) { + this.name = name; + } + + abstract int run(int iterations) throws Throwable; + + private BigDecimal time() { + try { + int nextI = 1; + int i; + long duration; + do { + i = nextI; + long start = System.nanoTime(); + run(i); + duration = System.nanoTime() - start; + nextI = i << 1 | 1; + } while (duration < 100000000 && nextI > 0); + return new BigDecimal(duration * 1000 / i).movePointLeft(3); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return this.name + "\t" + time() + " ns"; + } + + static class C { + public Integer foo() { + return 1; + } + } + + static final MethodHandle sfmh; + + static { + try { + Method m = C.class.getMethod("foo"); + sfmh = MethodHandles.lookup().unreflect(m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) throws Exception { + final C invocationTarget = new C(); + final Method m = C.class.getMethod("foo"); + final Method am = C.class.getMethod("foo"); + am.setAccessible(true); + final MethodHandle mh = sfmh; + + final MethodAccess ma = MethodAccess.get(C.class); + final int mi = ma.getIndex("foo"); + + BenchmarkReflection[] marks = { + new BenchmarkReflection("reflective invocation (without setAccessible)") { + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += (Integer) m.invoke(invocationTarget); + } + return x; + } + }, + new BenchmarkReflection("reflective invocation (with setAccessible)") { + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += (Integer) am.invoke(invocationTarget); + } + return x; + } + }, + new BenchmarkReflection("reflectASM invocation") { + + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += (Integer) ma.invoke(invocationTarget, mi, (Object)null); + } + return x; + } + }, + new BenchmarkReflection("methodhandle invocation") { + + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += (Integer) mh.invokeExact(invocationTarget); + } + return x; + } + }, + new BenchmarkReflection("static final methodhandle invocation") { + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += (Integer) sfmh.invokeExact(invocationTarget); + } + return x; + } + }, + new BenchmarkReflection("direct invocation") { + @Override int run(int iterations) throws Throwable { + int x = 0; + for (int i = 0; i < iterations; i++) { + x += invocationTarget.foo(); + } + return x; + } + }, + }; + for (BenchmarkReflection bm : marks) { + System.out.println(bm); + } + } +} \ No newline at end of file diff --git a/src/test/java/net/engio/mbassy/multi/MBassadorTest.java b/src/test/java/net/engio/mbassy/multi/MBassadorTest.java index 5b8d44b..92673fc 100644 --- a/src/test/java/net/engio/mbassy/multi/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/multi/MBassadorTest.java @@ -2,6 +2,7 @@ package net.engio.mbassy.multi; import java.util.concurrent.atomic.AtomicInteger; +import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java b/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java index ff2ba80..a575b3d 100644 --- a/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java +++ b/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java @@ -5,6 +5,8 @@ package net.engio.mbassy.multi; import java.util.concurrent.atomic.AtomicInteger; +import net.engio.mbassy.multi.IMessageBus; +import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/multi/PerformanceTest.java b/src/test/java/net/engio/mbassy/multi/PerformanceTest.java index 55371c2..812ddbd 100644 --- a/src/test/java/net/engio/mbassy/multi/PerformanceTest.java +++ b/src/test/java/net/engio/mbassy/multi/PerformanceTest.java @@ -4,6 +4,7 @@ package net.engio.mbassy.multi; import junit.framework.Assert; +import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; @@ -42,7 +43,7 @@ public class PerformanceTest { long num = Long.MAX_VALUE; while (num-- > 0) { - bus.publish("s"); + bus.publishAsync("s"); } // bus.publish("s", "s"); diff --git a/src/test/java/net/engio/mbassy/multi/SyncBusTest.java b/src/test/java/net/engio/mbassy/multi/SyncBusTest.java index ebc7732..85e99e5 100644 --- a/src/test/java/net/engio/mbassy/multi/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/multi/SyncBusTest.java @@ -2,6 +2,8 @@ package net.engio.mbassy.multi; import java.util.concurrent.atomic.AtomicInteger; +import net.engio.mbassy.multi.IMessageBus; +import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.MessageBusTest;