Now have 300ms publication for both 2/4million handled events

This commit is contained in:
nathan 2015-02-20 15:43:39 +01:00
parent 758a1c837a
commit 2be96d7ff6
15 changed files with 514 additions and 358 deletions

View File

@ -40,7 +40,7 @@ public class MultiMBassador implements IMessageBus {
private List<Thread> threads; private List<Thread> threads;
public MultiMBassador() { public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()*2); this(Runtime.getRuntime().availableProcessors());
// this(2); // this(2);
} }
@ -134,6 +134,7 @@ public class MultiMBassador implements IMessageBus {
SubscriptionManager manager = this.subscriptionManager; SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass(); Class<?> messageClass = message.getClass();
// manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
// Run subscriptions // Run subscriptions
@ -166,6 +167,7 @@ public class MultiMBassador implements IMessageBus {
sub.publishToSubscription(this, message); sub.publishToSubscription(this, message);
} }
} }
// manager.readUnLock();
} }
@SuppressWarnings("null") @SuppressWarnings("null")
@ -184,7 +186,7 @@ public class MultiMBassador implements IMessageBus {
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
} }
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// Run subscriptions // Run subscriptions
@ -205,13 +207,13 @@ public class MultiMBassador implements IMessageBus {
} }
// now get superClasses // // now get superClasses
if (superSubscriptions != null) { // if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) { // for (Subscription sub : superSubscriptions) {
// this catches all exception types // // this catches all exception types
sub.publishToSubscription(this, message1, message2); // sub.publishToSubscription(this, message1, message2);
} // }
} // }
} }
@SuppressWarnings("null") @SuppressWarnings("null")
@ -231,7 +233,7 @@ public class MultiMBassador implements IMessageBus {
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
} }
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// Run subscriptions // Run subscriptions
@ -252,13 +254,13 @@ public class MultiMBassador implements IMessageBus {
} }
// now get superClasses // // now get superClasses
if (superSubscriptions != null) { // if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) { // for (Subscription sub : superSubscriptions) {
// this catches all exception types // // this catches all exception types
sub.publishToSubscription(this, message1, message2, message3); // sub.publishToSubscription(this, message1, message2, message3);
} // }
} // }
} }

View File

@ -128,7 +128,7 @@ public class ReflectionUtils
} }
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){ public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){
return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap<AnnotatedElement>(8, .8f))); return getAnnotation(from, annotationType, Collections.newSetFromMap(new Object2BooleanOpenHashMap<AnnotatedElement>(8, 0.8F)));
} }
// //

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.multi.dispatch; package net.engio.mbassy.multi.dispatch;
import java.lang.reflect.Method; import com.esotericsoftware.reflectasm.MethodAccess;
/** /**
* A handler invocation encapsulates the logic that is used to invoke a single * A handler invocation encapsulates the logic that is used to invoke a single
@ -28,7 +28,7 @@ public interface IHandlerInvocation {
* type that the handler consumes * type that the handler consumes
* @param handler The handler (method) that will be called via reflection * @param handler The handler (method) that will be called via reflection
*/ */
void invoke(Object listener, Method handler, Object message) throws Throwable; void invoke(Object listener, MethodAccess handler, int methodIndex, Object message) throws Throwable;
/** /**
* Invoke the message delivery logic of this handler * Invoke the message delivery logic of this handler
@ -39,7 +39,7 @@ public interface IHandlerInvocation {
* type that the handler consumes * type that the handler consumes
* @param handler The handler (method) that will be called via reflection * @param handler The handler (method) that will be called via reflection
*/ */
void invoke(Object listener, Method handler, Object message1, Object message2) throws Throwable; void invoke(Object listener, MethodAccess handler, int methodIndex, Object message1, Object message2) throws Throwable;
/** /**
* Invoke the message delivery logic of this handler * Invoke the message delivery logic of this handler
@ -50,7 +50,7 @@ public interface IHandlerInvocation {
* type that the handler consumes * type that the handler consumes
* @param handler The handler (method) that will be called via reflection * @param handler The handler (method) that will be called via reflection
*/ */
void invoke(Object listener, Method handler, Object message1, Object message2, Object message3) throws Throwable; void invoke(Object listener, MethodAccess handler, int methodIndex, Object message1, Object message2, Object message3) throws Throwable;
/** /**
* Invoke the message delivery logic of this handler * Invoke the message delivery logic of this handler
@ -61,5 +61,5 @@ public interface IHandlerInvocation {
* type that the handler consumes * type that the handler consumes
* @param handler The handler (method) that will be called via reflection * @param handler The handler (method) that will be called via reflection
*/ */
void invoke(Object listener, Method handler, Object... message) throws Throwable; void invoke(Object listener, MethodAccess handler, int methodIndex, Object... message) throws Throwable;
} }

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.multi.dispatch; package net.engio.mbassy.multi.dispatch;
import java.lang.reflect.Method; import com.esotericsoftware.reflectasm.MethodAccess;
/** /**
* Uses reflection to invoke a message handler for a given message. * Uses reflection to invoke a message handler for a given message.
@ -17,22 +17,22 @@ public class ReflectiveHandlerInvocation implements IHandlerInvocation {
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message) throws Throwable { public void invoke(final Object listener, final MethodAccess handler, final int methodIndex, final Object message) throws Throwable {
handler.invoke(listener, message); handler.invoke(listener, methodIndex, message);
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message1, final Object message2) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2) throws Throwable {
handler.invoke(listener, message1, message2); handler.invoke(listener, methodIndex, message1, message2);
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message1, final Object message2, final Object message3) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2, final Object message3) throws Throwable {
handler.invoke(listener, message1, message2, message3); handler.invoke(listener, methodIndex, message1, message2, message3);
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object... messages) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object... messages) throws Throwable {
handler.invoke(listener, messages); handler.invoke(listener, methodIndex, messages);
} }
} }

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.multi.dispatch; package net.engio.mbassy.multi.dispatch;
import java.lang.reflect.Method; import com.esotericsoftware.reflectasm.MethodAccess;
/** /**
* Synchronizes message handler invocations for all handlers that specify @Synchronized * Synchronizes message handler invocations for all handlers that specify @Synchronized
@ -19,30 +19,30 @@ public class SynchronizedHandlerInvocation implements IHandlerInvocation {
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message) throws Throwable { public void invoke(final Object listener, final MethodAccess handler, final int methodIndex, final Object message) throws Throwable {
synchronized (listener) { synchronized (listener) {
this.delegate.invoke(listener, handler, message); this.delegate.invoke(listener, handler, methodIndex, message);
} }
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message1, final Object message2) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2) throws Throwable {
synchronized (listener) { synchronized (listener) {
this.delegate.invoke(listener, handler, message1, message2); this.delegate.invoke(listener, handler, methodIndex, message1, message2);
} }
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object message1, final Object message2, final Object message3) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object message1, final Object message2, final Object message3) throws Throwable {
synchronized (listener) { synchronized (listener) {
this.delegate.invoke(listener, handler, message1, message2, message3); this.delegate.invoke(listener, handler, methodIndex, message1, message2, message3);
} }
} }
@Override @Override
public void invoke(final Object listener, final Method handler, final Object... messages) throws Throwable { public void invoke(final Object listener, MethodAccess handler, int methodIndex, final Object... messages) throws Throwable {
synchronized (listener) { synchronized (listener) {
this.delegate.invoke(listener, handler, messages); this.delegate.invoke(listener, handler, methodIndex, messages);
} }
} }
} }

View File

@ -7,6 +7,8 @@ import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Synchronized; import net.engio.mbassy.multi.annotations.Synchronized;
import net.engio.mbassy.multi.common.ReflectionUtils; import net.engio.mbassy.multi.common.ReflectionUtils;
import com.esotericsoftware.reflectasm.MethodAccess;
/** /**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains * Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy * the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
@ -28,14 +30,16 @@ import net.engio.mbassy.multi.common.ReflectionUtils;
*/ */
public class MessageHandler { public class MessageHandler {
private final Method handler; private final MethodAccess handler;
private final int methodIndex;
private final Class<?>[] handledMessages; private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes; private final boolean acceptsSubtypes;
private final MessageListener listenerConfig; private final MessageListener listenerConfig;
private final boolean isSynchronized; private final boolean isSynchronized;
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){ public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata) {
super(); super();
if (handler == null) { if (handler == null) {
@ -44,8 +48,8 @@ public class MessageHandler {
Class<?>[] handledMessages = handler.getParameterTypes(); Class<?>[] handledMessages = handler.getParameterTypes();
this.handler = handler; this.handler = MethodAccess.get(handler.getDeclaringClass());
handler.setAccessible(true); this.methodIndex = this.handler.getIndex(handler.getName(), handledMessages);
this.acceptsSubtypes = !handlerConfig.rejectSubtypes(); this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
this.listenerConfig = listenerMetadata; this.listenerConfig = listenerMetadata;
@ -61,10 +65,14 @@ public class MessageHandler {
return this.listenerConfig.isFromListener(listener); return this.listenerConfig.isFromListener(listener);
} }
public Method getHandler() { public MethodAccess getHandler() {
return this.handler; return this.handler;
} }
public int getMethodIndex() {
return this.methodIndex;
}
public Class<?>[] getHandledMessages() { public Class<?>[] getHandledMessages() {
return this.handledMessages; return this.handledMessages;
} }

View File

@ -48,7 +48,6 @@ public class MetadataReader {
// if a handler is overwritten it inherits the configuration of its parent method // if a handler is overwritten it inherits the configuration of its parent method
MessageHandler handlerMetadata = new MessageHandler(overriddenHandler, handlerConfig, listenerMetadata); MessageHandler handlerMetadata = new MessageHandler(overriddenHandler, handlerConfig, listenerMetadata);
listenerMetadata.addHandler(handlerMetadata); listenerMetadata.addHandler(handlerMetadata);
} }
return listenerMetadata; return listenerMetadata;
} }

View File

@ -1,8 +1,6 @@
package net.engio.mbassy.multi.subscription; package net.engio.mbassy.multi.subscription;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.common.StrongConcurrentSet;
@ -10,9 +8,10 @@ import net.engio.mbassy.multi.dispatch.IHandlerInvocation;
import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.multi.error.ErrorHandlingSupport; import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MessageHandler;
import com.esotericsoftware.reflectasm.MethodAccess;
/** /**
* A subscription is a thread-safe container that manages exactly one message handler of all registered * A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
@ -88,208 +87,207 @@ public class Subscription {
Collection<Object> listeners = this.listeners; Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
Method handler = this.handlerMetadata.getHandler(); MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation; IHandlerInvocation invocation = this.invocation;
try { try {
for (Object listener : listeners) { for (Object listener : listeners) {
invocation.invoke(listener, handler, message); invocation.invoke(listener, handler, handleIndex, message);
} }
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError() e.printStackTrace();
.setMessage("Error during invocation of message handler. " + // errorHandler.handlePublicationError(new PublicationError()
"The class or method is not accessible") // .setMessage("Error during invocation of message handler. " +
.setCause(e) // "The class or method is not accessible")
.setMethodName(handler.getName()) // .setCause(e)
// .setListener(listener) // .setMethodName(handler.getName())
.setPublishedObject(message)); //// .setListener(listener)
// .setPublishedObject(message));
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError() e.printStackTrace();
.setMessage("Error during invocation of message handler. " + // errorHandler.handlePublicationError(new PublicationError()
"Wrong arguments passed to method. Was: " + message.getClass() // .setMessage("Error during invocation of message handler. " +
+ "Expected: " + handler.getParameterTypes()[0]) // "Wrong arguments passed to method. Was: " + message.getClass()
.setCause(e) // + "Expected: " + handler.getParameterTypes()[0])
.setMethodName(handler.getName()) // .setCause(e)
// .setListener(listener) // .setMethodName(handler.getName())
.setPublishedObject(message)); //// .setListener(listener)
// .setPublishedObject(message));
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError() e.printStackTrace();
.setMessage("Error during invocation of message handler. " + // errorHandler.handlePublicationError(new PublicationError()
"Message handler threw exception") // .setMessage("Error during invocation of message handler. " +
.setCause(e) // "Message handler threw exception")
.setMethodName(handler.getName()) // .setCause(e)
// .setListener(listener) // .setMethodName(handler.getName())
.setPublishedObject(message)); //// .setListener(listener)
// .setPublishedObject(message));
} catch (Throwable e) { } catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError() e.printStackTrace();
.setMessage("Error during invocation of message handler. " + // errorHandler.handlePublicationError(new PublicationError()
"The handler code threw an exception") // .setMessage("Error during invocation of message handler. " +
.setCause(e) // "The handler code threw an exception")
.setMethodName(handler.getName()) // .setCause(e)
// .setListener(listener) // .setMethodName(handler.getName())
.setPublishedObject(message)); //// .setListener(listener)
// .setPublishedObject(message));
} }
} }
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners;
Collection<Object> listeners = this.listeners; //
// IConcurrentSet<Object> listeners = this.listeners; // if (listeners.size() > 0) {
// MethodHandle handler = this.handlerMetadata.getHandler();
if (listeners.size() > 0) { //
Method handler = this.handlerMetadata.getHandler(); // for (Object listener : listeners) {
// try {
for (Object listener : listeners) { // this.invocation.invoke(listener, handler, message1, message2);
try { // } catch (IllegalAccessException e) {
this.invocation.invoke(listener, handler, message1, message2); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalAccessException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The class or method is not accessible")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The class or method is not accessible") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2));
.setListener(listener) // } catch (IllegalArgumentException e) {
.setPublishedObject(message1, message2)); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalArgumentException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Wrong arguments passed to method. Was: " +
.setMessage("Error during invocation of message handler. " + // message1.getClass() + ", " +
"Wrong arguments passed to method. Was: " + // message2.getClass()
message1.getClass() + ", " + // + ". Expected: " + handler.getParameterTypes()[0] + ", " +
message2.getClass() // handler.getParameterTypes()[1]
+ ". Expected: " + handler.getParameterTypes()[0] + ", " + // )
handler.getParameterTypes()[1] // .setCause(e)
) // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2));
.setListener(listener) // } catch (InvocationTargetException e) {
.setPublishedObject(message1, message2)); // errorHandler.handlePublicationError(new PublicationError()
} catch (InvocationTargetException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Message handler threw exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"Message handler threw exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2));
.setListener(listener) // } catch (Throwable e) {
.setPublishedObject(message1, message2)); // errorHandler.handlePublicationError(new PublicationError()
} catch (Throwable e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The handler code threw an exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The handler code threw an exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2));
.setListener(listener) // }
.setPublishedObject(message1, message2)); // }
} // }
}
}
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners;
Collection<Object> listeners = this.listeners; //
// IConcurrentSet<Object> listeners = this.listeners; // if (listeners.size() > 0) {
// Method handler = this.handlerMetadata.getHandler();
if (listeners.size() > 0) { //
Method handler = this.handlerMetadata.getHandler(); // for (Object listener : listeners) {
// try {
for (Object listener : listeners) { // this.invocation.invoke(listener, handler, message1, message2, message3);
try { // } catch (IllegalAccessException e) {
this.invocation.invoke(listener, handler, message1, message2, message3); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalAccessException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The class or method is not accessible")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The class or method is not accessible") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (IllegalArgumentException e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalArgumentException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Wrong arguments passed to method. Was: " +
.setMessage("Error during invocation of message handler. " + // message1.getClass() + ", " +
"Wrong arguments passed to method. Was: " + // message2.getClass() + ", " +
message1.getClass() + ", " + // message3.getClass()
message2.getClass() + ", " + // + ". Expected: " + handler.getParameterTypes()[0] + ", " +
message3.getClass() // handler.getParameterTypes()[1] + ", " +
+ ". Expected: " + handler.getParameterTypes()[0] + ", " + // handler.getParameterTypes()[2]
handler.getParameterTypes()[1] + ", " + // )
handler.getParameterTypes()[2] // .setCause(e)
) // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (InvocationTargetException e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (InvocationTargetException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Message handler threw exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"Message handler threw exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (Throwable e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (Throwable e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The handler code threw an exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The handler code threw an exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // }
.setPublishedObject(message1, message2, message3)); // }
} // }
}
}
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners;
Collection<Object> listeners = this.listeners; //
// IConcurrentSet<Object> listeners = this.listeners; // if (listeners.size() > 0) {
// Method handler = this.handlerMetadata.getHandler();
if (listeners.size() > 0) { //
Method handler = this.handlerMetadata.getHandler(); // for (Object listener : listeners) {
// try {
for (Object listener : listeners) { // this.invocation.invoke(listener, handler, messages);
try { // } catch (IllegalAccessException e) {
this.invocation.invoke(listener, handler, messages); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalAccessException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The class or method is not accessible")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The class or method is not accessible") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(messages));
.setListener(listener) // } catch (IllegalArgumentException e) {
.setPublishedObject(messages)); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalArgumentException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages)
.setMessage("Error during invocation of message handler. " + // + "Expected: " + Arrays.deepToString(handler.getParameterTypes()))
"Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) // .setCause(e)
+ "Expected: " + Arrays.deepToString(handler.getParameterTypes())) // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(messages));
.setListener(listener) // } catch (InvocationTargetException e) {
.setPublishedObject(messages)); // errorHandler.handlePublicationError(new PublicationError()
} catch (InvocationTargetException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Message handler threw exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"Message handler threw exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(messages));
.setListener(listener) // } catch (InterruptedException e) {
.setPublishedObject(messages)); // Thread.currentThread().interrupt();
} catch (InterruptedException e) { // return;
Thread.currentThread().interrupt(); // } catch (Throwable e) {
return; // errorHandler.handlePublicationError(new PublicationError()
} catch (Throwable e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The handler code threw an exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The handler code threw an exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(messages));
.setListener(listener) // }
.setPublishedObject(messages)); // }
} // }
}
}
} }
@Override @Override

View File

@ -1,9 +1,10 @@
package net.engio.mbassy.multi.subscription; package net.engio.mbassy.multi.subscription;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap; import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry;
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet;
import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -53,7 +54,7 @@ public class SubscriptionManager {
private final Object holder = new Object[0]; private final Object holder = new Object[0];
private final Map<Class<?>, Collection<Class<?>>> superClassesCache; private final Map<Class<?>, FastEntrySet<Class<?>>> superClassesCache;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
@ -65,9 +66,10 @@ public class SubscriptionManager {
// remember already processed classes that do not contain any message handlers // remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Object> nonListeners; private final Map<Class<?>, Object> nonListeners;
// synchronize read/write acces to the subscription maps // synchronize read/write access to the subscription maps
private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
public SubscriptionManager(int numberOfThreads) { public SubscriptionManager(int numberOfThreads) {
this.MAP_STRIPING = numberOfThreads; this.MAP_STRIPING = numberOfThreads;
this.LOAD_FACTOR = 0.8f; this.LOAD_FACTOR = 0.8f;
@ -78,7 +80,7 @@ public class SubscriptionManager {
// only used during SUB/UNSUB // only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1); this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
this.superClassesCache = new ConcurrentHashMap<Class<?>, Collection<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); this.superClassesCache = new ConcurrentHashMap<Class<?>, FastEntrySet<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
} }
@ -354,14 +356,17 @@ public class SubscriptionManager {
FastEntrySet<Subscription> subsPerType = superClassSubs.get(superType); FastEntrySet<Subscription> subsPerType = superClassSubs.get(superType);
if (subsPerType == null) { if (subsPerType == null) {
Collection<Class<?>> types = this.superClassesCache.get(superType); FastEntrySet<Class<?>> types = this.superClassesCache.get(superType);
if (types == null || types.isEmpty()) { if (types == null || types.isEmpty()) {
return null; return null;
} }
Reference2BooleanArrayMap<Subscription> map = new Reference2BooleanArrayMap<Subscription>(types.size() + 1); Reference2BooleanArrayMap<Subscription> map = new Reference2BooleanArrayMap<Subscription>(types.size() + 1);
for (Class<?> superClass : types) { ObjectIterator<Entry<Class<?>>> fastIterator = types.fastIterator();
while (fastIterator.hasNext()) {
Class<?> superClass = fastIterator.next().getKey();
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
if (subs != null && !subs.isEmpty()) { if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
@ -381,143 +386,146 @@ public class SubscriptionManager {
// must be protected by read lock // must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) { public void getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
// Collection<Subscription> subsPerType2 = this.superClassSubscriptions.get(); // Collection<Subscription> subsPerType2 = this.superClassSubscriptions.get();
//
//
// // not thread safe. DO NOT MODIFY
// Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
// Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
//
// Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
//
// Collection<Subscription> subs;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
//
// Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
// Iterator<Class<?>> iterator2;
//
// Class<?> eventSuperType1;
// Class<?> eventSuperType2;
//
// while (iterator1.hasNext()) {
// eventSuperType1 = iterator1.next();
// boolean type1Matches = eventSuperType1 == superType1;
//
// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
// if (leaf1 != null) {
// iterator2 = new SuperClassIterator(superType2, types2);
//
// while (iterator2.hasNext()) {
// eventSuperType2 = iterator2.next();
// if (type1Matches && eventSuperType2 == superType2) {
// continue;
// }
//
// leaf2 = leaf1.getLeaf(eventSuperType2);
//
// if (leaf2 != null) {
// subs = leaf2.getValue();
// if (subs != null) {
// for (Subscription sub : subs) {
// if (sub.acceptsSubtypes()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// }
// }
// }
// return subsPerType;
// not thread safe. DO NOT MODIFY
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
Collection<Subscription> subs;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2);
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
return subsPerType;
} }
// must be protected by read lock // must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) { public void getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
// not thread safe. DO NOT MODIFY // // not thread safe. DO NOT MODIFY
Collection<Class<?>> types1 = this.superClassesCache.get(superType1); // Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2); // Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Class<?>> types3 = this.superClassesCache.get(superType3); // Collection<Class<?>> types3 = this.superClassesCache.get(superType3);
//
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE); // Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
//
Collection<Subscription> subs; // Collection<Subscription> subs;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1; // IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2; // IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf3; // IdentityObjectTree<Class<?>, Collection<Subscription>> leaf3;
//
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1); // Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2; // Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3; // Iterator<Class<?>> iterator3;
//
Class<?> eventSuperType1; // Class<?> eventSuperType1;
Class<?> eventSuperType2; // Class<?> eventSuperType2;
Class<?> eventSuperType3; // Class<?> eventSuperType3;
//
while (iterator1.hasNext()) { // while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next(); // eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1; // boolean type1Matches = eventSuperType1 == superType1;
//
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); // leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) { // if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2); // iterator2 = new SuperClassIterator(superType2, types2);
//
while (iterator2.hasNext()) { // while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next(); // eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2; // boolean type12Matches = type1Matches && eventSuperType2 == superType2;
//
leaf2 = leaf1.getLeaf(eventSuperType2); // leaf2 = leaf1.getLeaf(eventSuperType2);
//
if (leaf2 != null) { // if (leaf2 != null) {
iterator3 = new SuperClassIterator(superType3, types3); // iterator3 = new SuperClassIterator(superType3, types3);
//
while (iterator3.hasNext()) { // while (iterator3.hasNext()) {
eventSuperType3 = iterator3.next(); // eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) { // if (type12Matches && eventSuperType3 == superType3) {
continue; // continue;
} // }
//
leaf3 = leaf2.getLeaf(eventSuperType3); // leaf3 = leaf2.getLeaf(eventSuperType3);
//
subs = leaf3.getValue(); // subs = leaf3.getValue();
if (subs != null) { // if (subs != null) {
for (Subscription sub : subs) { // for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) { // if (sub.acceptsSubtypes()) {
subsPerType.add(sub); // subsPerType.add(sub);
} // }
} // }
} // }
} // }
} // }
} // }
} // }
} // }
//
return subsPerType; // return subsPerType;
} }
/** /**
* race conditions will result in duplicate answers, which we don't care if happens * race conditions will result in duplicate answers, which we don't care if happens
*/ */
private Collection<Class<?>> setupSuperClassCache(Class<?> clazz) { private void setupSuperClassCache(Class<?> clazz) {
Collection<Class<?>> types = this.superClassesCache.get(clazz); if (!this.superClassesCache.containsKey(clazz)) {
if (types == null) {
// it doesn't matter if concurrent access stomps on values, since they are always the same. // it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
// types = new ArrayDeque<Class<?>>(superTypes); // types = new ArrayDeque<Class<?>>(superTypes);
types = new StrongConcurrentSet<Class<?>>(superTypes.size(), this.LOAD_FACTOR); // types = new StrongConcurrentSet<Class<?>>(superTypes.size(), this.LOAD_FACTOR);
types.addAll(superTypes); // types.addAll(superTypes);
Reference2BooleanArrayMap<Class<?>> map = new Reference2BooleanArrayMap<Class<?>>(superTypes.size() + 1);
for (Class<?> c : superTypes) {
map.put(c, Boolean.TRUE);
}
FastEntrySet<Class<?>> fastSet = map.reference2BooleanEntrySet();
// race conditions will result in duplicate answers, which we don't care about // race conditions will result in duplicate answers, which we don't care about
this.superClassesCache.put(clazz, types); this.superClassesCache.put(clazz, fastSet);
} }
return types;
} }
public static class SuperClassIterator implements Iterator<Class<?>> { public static class SuperClassIterator implements Iterator<Class<?>> {

View File

@ -3,6 +3,8 @@ package net.engio.mbassy.multi;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest; import net.engio.mbassy.multi.common.MessageBusTest;

View File

@ -0,0 +1,133 @@
package net.engio.mbassy.multi;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import com.esotericsoftware.reflectasm.MethodAccess;
// from: https://stackoverflow.com/questions/14146570/calling-a-getter-in-java-though-reflection-whats-the-fastest-way-to-repeatedly/14146919#14146919
// modified by dorkbox
public abstract class BenchmarkReflection {
final String name;
public BenchmarkReflection(String name) {
this.name = name;
}
abstract int run(int iterations) throws Throwable;
private BigDecimal time() {
try {
int nextI = 1;
int i;
long duration;
do {
i = nextI;
long start = System.nanoTime();
run(i);
duration = System.nanoTime() - start;
nextI = i << 1 | 1;
} while (duration < 100000000 && nextI > 0);
return new BigDecimal(duration * 1000 / i).movePointLeft(3);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return this.name + "\t" + time() + " ns";
}
static class C {
public Integer foo() {
return 1;
}
}
static final MethodHandle sfmh;
static {
try {
Method m = C.class.getMethod("foo");
sfmh = MethodHandles.lookup().unreflect(m);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
final C invocationTarget = new C();
final Method m = C.class.getMethod("foo");
final Method am = C.class.getMethod("foo");
am.setAccessible(true);
final MethodHandle mh = sfmh;
final MethodAccess ma = MethodAccess.get(C.class);
final int mi = ma.getIndex("foo");
BenchmarkReflection[] marks = {
new BenchmarkReflection("reflective invocation (without setAccessible)") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += (Integer) m.invoke(invocationTarget);
}
return x;
}
},
new BenchmarkReflection("reflective invocation (with setAccessible)") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += (Integer) am.invoke(invocationTarget);
}
return x;
}
},
new BenchmarkReflection("reflectASM invocation") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += (Integer) ma.invoke(invocationTarget, mi, (Object)null);
}
return x;
}
},
new BenchmarkReflection("methodhandle invocation") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += (Integer) mh.invokeExact(invocationTarget);
}
return x;
}
},
new BenchmarkReflection("static final methodhandle invocation") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += (Integer) sfmh.invokeExact(invocationTarget);
}
return x;
}
},
new BenchmarkReflection("direct invocation") {
@Override int run(int iterations) throws Throwable {
int x = 0;
for (int i = 0; i < iterations; i++) {
x += invocationTarget.foo();
}
return x;
}
},
};
for (BenchmarkReflection bm : marks) {
System.out.println(bm);
}
}
}

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest; import net.engio.mbassy.multi.common.MessageBusTest;

View File

@ -5,6 +5,8 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest; import net.engio.mbassy.multi.common.MessageBusTest;

View File

@ -4,6 +4,7 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import junit.framework.Assert; import junit.framework.Assert;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.error.PublicationError;
@ -42,7 +43,7 @@ public class PerformanceTest {
long num = Long.MAX_VALUE; long num = Long.MAX_VALUE;
while (num-- > 0) { while (num-- > 0) {
bus.publish("s"); bus.publishAsync("s");
} }
// bus.publish("s", "s"); // bus.publish("s", "s");

View File

@ -2,6 +2,8 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest; import net.engio.mbassy.multi.common.MessageBusTest;