Added remaining arg 1/2/3 methods. Fixed VarArg array construction for calling handlers that are VarArg. WIP disruptor handling

This commit is contained in:
nathan 2015-02-07 16:20:34 +01:00
parent c69e3c686e
commit 113ffb5e4a
26 changed files with 1001 additions and 565 deletions

View File

@ -0,0 +1,42 @@
package net.engio.mbassy;
/**
* The dead message event is published whenever no message
* handlers could be found for a given message publication.
*
* @author bennidi
* Date: 1/18/13
* @author dorkbox, llc
* Date: 2/2/15
*/
final class DeadMessage {
private Object[] relatedMessages;
DeadMessage(Object message) {
this.relatedMessages = new Object[1];
this.relatedMessages[0] = message;
}
DeadMessage(Object message1, Object message2) {
this.relatedMessages = new Object[2];
this.relatedMessages[0] = message1;
this.relatedMessages[1] = message2;
}
DeadMessage(Object message1, Object message2, Object message3 ) {
this.relatedMessages = new Object[3];
this.relatedMessages[0] = message1;
this.relatedMessages[1] = message2;
this.relatedMessages[2] = message3;
}
DeadMessage(Object[] messages) {
this.relatedMessages = messages;
}
public Object[] getMessages() {
return this.relatedMessages;
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
import net.engio.mbassy.error.ErrorHandlingSupport;
/**
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.

View File

@ -1,19 +1,29 @@
package net.engio.mbassy;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.DisruptorThreadFactory;
import net.engio.mbassy.disruptor.EventBusFactory;
import net.engio.mbassy.disruptor.EventProcessor;
import net.engio.mbassy.disruptor.MessageHolder;
import net.engio.mbassy.disruptor.MessageType;
import net.engio.mbassy.disruptor.PublicationExceptionHandler;
import net.engio.mbassy.error.IPublicationErrorHandler;
import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@ -24,7 +34,13 @@ import com.lmax.disruptor.dsl.ProducerType;
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MBassador extends AbstractPubSubSupport implements IMessageBus {
public class MBassador implements IMessageBus {
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
// any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down
private final ExecutorService executor = Executors.newCachedThreadPool(new DisruptorThreadFactory());
@ -42,66 +58,351 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
public MBassador(int numberOfThreads) {
super();
if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 thread.
}
this.subscriptionManager = new SubscriptionManager();
EventBusFactory factory = new EventBusFactory();
EventProcessor procs[] = new EventProcessor[numberOfThreads];
for (int i = 0; i < procs.length; i++) {
procs[i] = new EventProcessor(this, i, procs.length);
}
PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this);
this.disruptor = new Disruptor<MessageHolder>(factory, this.ringBufferSize, this.executor, ProducerType.MULTI, new SleepingWaitStrategy());
this.ringBuffer = this.disruptor.getRingBuffer();
// tell the disruptor to handle procs first
this.disruptor.handleEventsWith(procs);
this.ringBuffer = this.disruptor.start();
WorkHandler<MessageHolder> handlers[] = new EventProcessor[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new EventProcessor(this);
}
WorkerPool<MessageHolder> workerPool = new WorkerPool<MessageHolder>(this.ringBuffer,
this.ringBuffer.newBarrier(),
loggingExceptionHandler,
handlers);
this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
}
public final MBassador start() {
// workerPool.start(this.executor); ??
this.disruptor.start();
return this;
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this.errorHandlers) {
this.errorHandlers.add(handler);
}
}
@Override
public final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
@Override
public boolean unsubscribe(Object listener) {
return this.subscriptionManager.unsubscribe(listener);
}
@Override
public void subscribe(Object listener) {
this.subscriptionManager.subscribe(listener);
}
@Override
public boolean hasPendingMessages() {
return this.ringBuffer.remainingCapacity() != this.ringBufferSize;
}
@Override
public void shutdown() {
this.disruptor.shutdown();
this.executor.shutdown();
}
@Override
public void publish(Object message) {
try {
publishMessage(message);
Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
boolean handled = false;
if (sub.isVarArg()) {
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(this, vararg);
}
if (!handled) {
sub.publishToSubscription(this, message);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
if (!DeadMessage.class.equals(messageClass.getClass())) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
}
}
@Override
public void publish(Object message1, Object message2) {
try {
publishMessage(message1, message2);
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message1, message2);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
boolean handled = false;
if (sub.isVarArg()) {
Class<?> class1 = message1.getClass();
Class<?> class2 = message2.getClass();
if (!class1.isArray() && class1 == class2) {
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(class1, 2);
vararg[0] = message1;
vararg[1] = message2;
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(this, vararg);
}
}
if (!handled) {
sub.publishToSubscription(this, message1, message2);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message1, message2);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(new Object[] {message1, message2}));
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(Object message1, Object message2, Object message3) {
try {
publishMessage(message1, message2, message3);
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
boolean handled = false;
if (sub.isVarArg()) {
Class<?> class1 = message1.getClass();
Class<?> class2 = message2.getClass();
Class<?> class3 = message3.getClass();
if (!class1.isArray() && class1 == class2 && class2 == class3) {
// messy, but the ONLY way to do it.
if (vararg == null) {
vararg = (Object[]) Array.newInstance(class1, 3);
vararg[0] = message1;
vararg[1] = message2;
vararg[2] = message3;
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(this, vararg);
}
}
if (!handled) {
sub.publishToSubscription(this, message1, message2, message3);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
// cleanup
deadMessage = null;
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(new Object[] {message1, message2, message3}));
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(Object... messages) {
try {
publishMessage(messages);
// cannot have DeadMessage published to this!
int size = messages.length;
boolean allSameType = true;
Class<?>[] messageClasses = new Class[size];
Class<?> first = null;
if (size > 0) {
first = messageClasses[0] = messages[0].getClass();
}
for (int i=1;i<size;i++) {
messageClasses[i] = messages[i].getClass();
if (first != messageClasses[i]) {
allSameType = false;
}
}
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(messages);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
boolean handled = false;
if (first != null && allSameType && sub.isVarArg()) {
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(first, size);
for (int i=0;i<size;i++) {
vararg[i] = messages[i];
}
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(this, vararg);
}
if (!handled) {
sub.publishToSubscription(this, messages);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(messages);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
@ -110,11 +411,6 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
}
}
@Override
public void publishAsync(Object message) {
// put this on the disruptor ring buffer
@ -124,12 +420,82 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.message = message;
eventJob.messageType = MessageType.ONE;
eventJob.message1 = message;
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(Object message1, Object message2) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.TWO;
eventJob.message1 = message1;
eventJob.message2 = message2;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message1, message2));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(Object message1, Object message2, Object message3) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.THREE;
eventJob.message1 = message1;
eventJob.message2 = message2;
eventJob.message3 = message3;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(new Object[] {message1, message2, message3}));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(Object... messages) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.ARRAY;
eventJob.messages = messages;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(messages));
} finally {
// always publish the job
ringBuffer.publish(seq);
@ -150,7 +516,7 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(new Exception("Timeout"))
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
return;
}
}
@ -159,12 +525,86 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.message = message;
eventJob.messageType = MessageType.ONE;
eventJob.message1 = message;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
// Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// to become available.
while (!ringBuffer.hasAvailableCapacity(1)) {
LockSupport.parkNanos(10L);
if (expireTimestamp <= System.currentTimeMillis()) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(new Exception("Timeout"))
.setPublishedObject(message1, message2));
return;
}
}
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.TWO;
eventJob.message1 = message1;
eventJob.message2 = message2;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
// Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// to become available.
while (!ringBuffer.hasAvailableCapacity(1)) {
LockSupport.parkNanos(10L);
if (expireTimestamp <= System.currentTimeMillis()) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(new Exception("Timeout"))
.setPublishedObject(message1, message2, message3));
return;
}
}
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.THREE;
eventJob.message1 = message1;
eventJob.message2 = message2;
eventJob.message3 = message3;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
} finally {
// always publish the job
ringBuffer.publish(seq);
@ -172,13 +612,38 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
}
@Override
public boolean hasPendingMessages() {
return this.ringBuffer.remainingCapacity() != this.ringBufferSize;
}
public void publishAsync(long timeout, TimeUnit unit, Object... messages) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
@Override
public void shutdown() {
this.disruptor.shutdown();
this.executor.shutdown();
// Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// to become available.
while (!ringBuffer.hasAvailableCapacity(1)) {
LockSupport.parkNanos(10L);
if (expireTimestamp <= System.currentTimeMillis()) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(new Exception("Timeout"))
.setPublishedObject(messages));
return;
}
}
// setup the job
final long seq = ringBuffer.next();
try {
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageType.ARRAY;
eventJob.messages = messages;
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(messages));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
}

View File

@ -50,115 +50,160 @@ public interface PubSubSupport {
void publish(Object message1, Object message2);
/**
* Synchronously publish <b>THREE</b> messages to all registered listeners (that match the signature). This includes listeners
* defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
* returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
* Synchronously publish <b>THREE</b> messages to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
*/
void publish(Object message1, Object message2, Object message3);
/**
* Synchronously publish <b>ARBITRARY</b> messages to all registered listeners (that match the signature). This includes listeners
* defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
* returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
* Synchronously publish <b>ARBITRARY or ARRAY</b> messages to all registered listeners (that match the
* signature). This includes listeners defined for super types of the given message type, provided they
* are not configured to reject valid subtypes. The call returns when all matching handlers of all
* registered listeners have been notified (invoked) of the message.
* <p>
* <p>
* <p>
* <b>
* Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an
* array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match!
* </b>
*/
void publish(Object... messages);
/**
* Publish the message asynchronously to all registered listeners (that match the signature). This includes listeners
* defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
* returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
* Publish the message asynchronously to all registered listeners (that match the signature). This includes
* listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes. The call returns when all matching handlers of all registered listeners have been notified
* (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*/
void publishAsync(Object message);
// /**
// * Publish <b>TWO</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue.
// */
// void publishAsync(Object message1, Object message2);
//
// /**
// * Publish <b>THREE</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue.
// */
// void publishAsync(Object message1, Object message2, Object message3);
//
// /**
// * Publish <b>ARBITRARY</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue.
// */
// void publishAsync(Object... messages);
/**
* Publish <b>TWO</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*/
void publishAsync(Object message1, Object message2);
/**
* Publish <b>THREE</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes. The call returns when all matching handlers of all registered listeners have been
* notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*/
void publishAsync(Object message1, Object message2, Object message3);
/**
* Publish <b>ARBITRARY</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes. The call returns when all matching handlers of all registered listeners have been notified
* (invoked) of the message.
* <p>
* <p>
* <b>
* Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an
* array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match!
* </b>
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*/
void publishAsync(Object... messages);
/**
* Publish the message asynchronously to all registered listeners (that match the signature). This includes listeners
* defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
* returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
* Publish the message asynchronously to all registered listeners (that match the signature). This includes
* listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes. The call returns when all matching handlers of all registered listeners have been notified
* (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
*/
void publishAsync(long timeout, TimeUnit unit, Object message);
// /**
// * Publish <b>TWO</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
// */
// void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2);
//
// /**
// * Publish <b>THREE</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
// */
// void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3);
//
// /**
// * Publish <b>ARBITRARY</b> messages asynchronously to all registered listeners (that match the signature). This includes listeners
// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call
// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message.
// * <p>
// * The behavior of this method depends on the configured queuing strategy:
// * <p>
// * If an unbound queuing strategy is used the call returns immediately.
// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
// */
// void publishAsync(long timeout, TimeUnit unit, Object... messages);
/**
* Publish <b>TWO</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
*/
void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2);
/**
* Publish <b>THREE</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes. The call returns when all matching handlers of all registered listeners have been
* notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
*/
void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3);
/**
* Publish <b>ARBITRARY or ARRAY</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes. The call returns when all matching handlers of all registered listeners have been notified
* (invoked) of the message.
* <p>
* <p>
* <b>
* Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an
* array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match!
* </b>
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached.
*/
void publishAsync(long timeout, TimeUnit unit, Object... messages);
}

View File

@ -12,8 +12,6 @@ import java.lang.annotation.Target;
*
* @author bennidi
* Date: 2/8/12
* @author dorkbox, llc
* Date: 2/2/15
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@ -32,14 +30,4 @@ public @interface Handler {
* handlers that have been declared by a superclass but do not apply to the subclass
*/
boolean enabled() default true;
/**
* Var-Arg. Should this handler accept variable arguments?
* <p>
* IE: should <b>foo</b> get dispatched to a handler registered as: <b>blah(String... args){}</b>
* <p>
* <p>
* <b>Normally</b> the only message to be received would be <b>new String[]{"boo", "bar"}</b>
*/
boolean vararg() default false;
}

View File

@ -1,248 +0,0 @@
package net.engio.mbassy.bus;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import net.engio.mbassy.PubSubSupport;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
/**
* The base class for all message bus implementations.
* @author bennidi
* @author dorkbox, llc
* Date: 2/2/15
*/
public abstract class AbstractPubSubSupport implements PubSubSupport, ErrorHandlingSupport {
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
public AbstractPubSubSupport() {
this.subscriptionManager = new SubscriptionManager();
}
@Override
public final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
@Override
public boolean unsubscribe(Object listener) {
return this.subscriptionManager.unsubscribe(listener);
}
@Override
public void subscribe(Object listener) {
this.subscriptionManager.subscribe(listener);
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this.errorHandlers) {
this.errorHandlers.add(handler);
}
}
public void publishMessage(Object message) {
Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
for (Subscription sub : subscriptions) {
Object msg = message;
if (sub.isVarArg()) {
if (!message.getClass().isArray()) {
// messy, but the ONLY way to do it.
Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
msg = vararg;
}
}
sub.publishToSubscription(this, msg);
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
if (!DeadMessage.class.equals(messageClass.getClass())) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
}
}
// cannot have DeadMessage published to this!
public void publishMessage(Object message1, Object message2) {
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
for (Subscription sub : subscriptions) {
boolean handled = false;
if (sub.isVarArg()) {
Class<?> class1 = message1.getClass();
Class<?> class2 = message2.getClass();
if (!class1.isArray() && class1 == class2) {
// messy, but the ONLY way to do it.
Object[] vararg = (Object[]) Array.newInstance(class1.getClass(), 2);
vararg[0] = message1;
vararg[1] = message2;
handled = true;
sub.publishToSubscription(this, vararg);
}
}
if (!handled) {
sub.publishToSubscription(this, message1, message2);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
}
// cannot have DeadMessage published to this!
public void publishMessage(Object message1, Object message2, Object message3) {
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2, message3});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
for (Subscription sub : subscriptions) {
boolean handled = false;
if (sub.isVarArg()) {
Class<?> class1 = message1.getClass();
Class<?> class2 = message2.getClass();
Class<?> class3 = message3.getClass();
if (!class1.isArray() && class1 == class2 && class2 == class3) {
// messy, but the ONLY way to do it.
Object[] vararg = (Object[]) Array.newInstance(class1.getClass(), 3);
vararg[0] = message1;
vararg[1] = message2;
vararg[2] = message3;
handled = true;
sub.publishToSubscription(this, vararg);
}
}
if (!handled) {
sub.publishToSubscription(this, message1, message2, message3);
}
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2});
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
}
// cannot have DeadMessage published to this!
public void publishMessage(Object... messages) {
int size = messages.length;
Class<?>[] messageClasses = new Class[size];
for (int i=0;i<size;i++) {
messageClasses[i] = messages[i].getClass();
}
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses);
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(messages);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
} else {
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, messages);
}
// if the message did not have any listener/handler accept it
if (subscriptions.isEmpty()) {
// cannot have DeadMessage published to this, so no extra check necessary
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(messages);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
}
}
}
}

View File

@ -1,24 +0,0 @@
package net.engio.mbassy.bus;
/**
* The dead message event is published whenever no message
* handlers could be found for a given message publication.
*
* @author bennidi
* Date: 1/18/13
* @author dorkbox, llc
* Date: 2/2/15
*/
public final class DeadMessage {
private Object[] relatedMessages;
DeadMessage(Object[] messages) {
this.relatedMessages = messages;
}
public Object[] getMessages() {
return this.relatedMessages;
}
}

View File

@ -1,38 +1,47 @@
package net.engio.mbassy.disruptor;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.PubSubSupport;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class EventProcessor implements EventHandler<MessageHolder> {
private final AbstractPubSubSupport publisher;
public class EventProcessor implements WorkHandler<MessageHolder> {
private final PubSubSupport publisher;
private final long ordinal;
private final long numberOfConsumers;
public EventProcessor(AbstractPubSubSupport publisher, final long ordinal, final long numberOfConsumers) {
public EventProcessor(PubSubSupport publisher) {
this.publisher = publisher;
this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers;
}
@Override
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % this.numberOfConsumers == this.ordinal) {
try {
this.publisher.publishMessage(event.message);
} catch (Throwable t) {
this.publisher.handlePublicationError(new PublicationError()
.setMessage("Error in asynchronous dispatch")
.setCause(t)
.setPublishedObject(new Object[] {event.message}));
public void onEvent(MessageHolder event) throws Exception {
MessageType messageType = event.messageType;
switch (messageType) {
case ONE: {
this.publisher.publish(event.message1);
event.message1 = null; // cleanup
return;
}
case TWO: {
this.publisher.publish(event.message1, event.message2);
event.message1 = null; // cleanup
event.message2 = null; // cleanup
return;
}
case THREE: {
this.publisher.publish(event.message1, event.message2, event.message3);
event.message1 = null; // cleanup
event.message2 = null; // cleanup
event.message3 = null; // cleanup
return;
}
case ARRAY: {
this.publisher.publish(event.messages);
event.messages = null; // cleanup
return;
}
event.message = null; // cleanup
}
}
}

View File

@ -5,7 +5,12 @@ package net.engio.mbassy.disruptor;
* Date: 2/2/15
*/
public class MessageHolder {
public Object message;
public MessageType messageType = MessageType.ONE;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public MessageHolder() {
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.disruptor;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public enum MessageType {
ONE, TWO, THREE, ARRAY
}

View File

@ -0,0 +1,35 @@
package net.engio.mbassy.disruptor;
import net.engio.mbassy.error.ErrorHandlingSupport;
import net.engio.mbassy.error.PublicationError;
import com.lmax.disruptor.ExceptionHandler;
public final class PublicationExceptionHandler implements ExceptionHandler {
private final ErrorHandlingSupport errorHandler;
public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void handleEventException(final Throwable e, final long sequence, final Object event) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")")
.setCause(e));
}
@Override
public void handleOnStartException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error starting the disruptor")
.setCause(e));
}
@Override
public void handleOnShutdownException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error stopping the disruptor")
.setCause(e));
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus.error;
package net.engio.mbassy.error;
/**
* @author bennidi

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus.error;
package net.engio.mbassy.error;
/**
* Publication error handlers are provided with a publication error every time an

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus.error;
package net.engio.mbassy.error;
/**
* The universal exception type for message bus implementations.
@ -23,6 +23,4 @@ public class MessageBusException extends Exception {
public MessageBusException(Throwable cause) {
super(cause);
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus.error;
package net.engio.mbassy.error;
import java.lang.reflect.Method;
import java.util.Arrays;
@ -81,6 +81,30 @@ public class PublicationError {
return this.publishedObjects;
}
public PublicationError setPublishedObject(Object publishedObject) {
this.publishedObjects = new Object[1];
this.publishedObjects[0] = publishedObject;
return this;
}
public PublicationError setPublishedObject(Object publishedObject1, Object publishedObject2) {
this.publishedObjects = new Object[2];
this.publishedObjects[0] = publishedObject1;
this.publishedObjects[1] = publishedObject2;
return this;
}
public PublicationError setPublishedObject(Object publishedObject1, Object publishedObject2, Object publishedObject3) {
this.publishedObjects = new Object[3];
this.publishedObjects[0] = publishedObject1;
this.publishedObjects[1] = publishedObject2;
this.publishedObjects[2] = publishedObject3;
return this;
}
public PublicationError setPublishedObject(Object[] publishedObjects) {
this.publishedObjects = publishedObjects;
return this;

View File

@ -11,6 +11,15 @@ import net.engio.mbassy.common.ReflectionUtils;
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
* defines such a message listener.
* <p>
* <p>
* Note: When sending messages to a handler that is of type ARRAY (either an object of type array, or a vararg), the JVM cannot
* tell the difference (the message that is being sent), if it is a vararg or array.
* <p>
* <p>
* BECAUSE OF THIS, we always treat the two the same
* <p>
* <p>
*
* @author bennidi
* Date: 11/14/12
@ -24,6 +33,7 @@ public class MessageHandler {
private final boolean acceptsSubtypes;
private final MessageListener listenerConfig;
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
private final boolean acceptsVarArg;
private final boolean isSynchronized;
@ -40,9 +50,12 @@ public class MessageHandler {
this.handler = handler;
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
this.listenerConfig = listenerMetadata;
this.acceptsVarArg = handlerConfig.vararg();
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
this.acceptsVarArg = handledMessages.length == 1 && handledMessages[0].isArray();
}
public <A extends Annotation> A getAnnotation(Class<A> annotationType){
@ -69,6 +82,97 @@ public class MessageHandler {
* @author dorkbox, llc
* Date: 2/2/15
*/
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 1) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType)) {
return false;
}
} else {
if (handledMessages[0] != messageType) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType1, Class<?> messageType2) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 2) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 3) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
if (!handledMessages[2].isAssignableFrom(messageType3)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
if (handledMessages[2] != messageType3) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/

View File

@ -4,10 +4,10 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.error.ErrorHandlingSupport;
import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.listener.MessageHandler;
/**
@ -132,7 +132,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -141,7 +141,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -149,8 +149,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -158,7 +157,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message}));
.setPublishedObject(message));
}
}
}
@ -188,7 +187,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2}));
.setPublishedObject(message1, message2));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -201,7 +200,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2}));
.setPublishedObject(message1, message2));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -209,8 +208,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2}));
.setPublishedObject(message1, message2));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -218,7 +216,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2}));
.setPublishedObject(message1, message2));
}
}
}
@ -248,7 +246,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2, message3}));
.setPublishedObject(message1, message2, message3));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -263,7 +261,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2, message3}));
.setPublishedObject(message1, message2, message3));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -271,8 +269,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2, message3}));
.setPublishedObject(message1, message2, message3));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -280,7 +277,7 @@ public class Subscription {
.setCause(e)
.setHandler(handler)
.setListener(listener)
.setPublishedObject(new Object[] {message1, message2, message3}));
.setPublishedObject(message1, message2, message3));
}
}
}
@ -328,7 +325,6 @@ public class Subscription {
.setHandler(handler)
.setListener(listener)
.setPublishedObject(messages));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +

View File

@ -11,13 +11,13 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.ObjectTree;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.error.MessageBusException;
import net.engio.mbassy.listener.MessageHandler;
import net.engio.mbassy.listener.MetadataReader;
@ -225,7 +225,7 @@ public class SubscriptionManager {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
if (sub.handlesMessageType(eventSuperType)) {
subscriptions.add(sub);
}
}
@ -235,20 +235,17 @@ public class SubscriptionManager {
///////////////
// a var-arg handler might match
///////////////
// tricky part. We have to check the ARRAY version
// tricky part. We have to check the ARRAY version,
for (Class<?> eventSuperType : types1) {
if (!eventSuperType.isArray()) {
// messy, but the ONLY way to do it.
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
}
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.isVarArg() && sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
subscriptions.add(sub);
}
}
}
@ -286,7 +283,7 @@ public class SubscriptionManager {
Collection<Subscription> subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType1, messageType2)) {
if (sub.handlesMessageType(eventSuperType1, eventSuperType2)) {
subscriptions.add(sub);
}
}
@ -302,18 +299,15 @@ public class SubscriptionManager {
if (messageType1 == messageType2) {
// tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types1) {
if (!eventSuperType.isArray()) {
// messy, but the ONLY way to do it.
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
}
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.isVarArg() && sub.handlesMessageType(messageType1)) {
subscriptions.add(sub);
}
subscriptions.add(sub);
}
}
}
@ -360,7 +354,7 @@ public class SubscriptionManager {
Collection<Subscription> subs = leaf3.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType1, messageType2, messageType3)) {
if (sub.handlesMessageType(eventSuperType1, eventSuperType2, eventSuperType3)) {
subscriptions.add(sub);
}
}
@ -378,18 +372,15 @@ public class SubscriptionManager {
if (messageType1 == messageType2 && messageType2 == messageType3) {
// tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types1) {
if (!eventSuperType.isArray()) {
// messy, but the ONLY way to do it.
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
}
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.isVarArg() && sub.handlesMessageType(messageType1)) {
subscriptions.add(sub);
}
subscriptions.add(sub);
}
}
}
@ -431,7 +422,7 @@ public class SubscriptionManager {
}
// add all subscriptions that match super types
// add all subscriptions that match super types combinations
// have to use recursion for this. BLEH
getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes);
@ -443,18 +434,15 @@ public class SubscriptionManager {
// tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types[0]) {
if (!eventSuperType.isArray()) {
// messy, but the ONLY way to do it.
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
}
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.isVarArg() && sub.handlesMessageType(firstType)) {
subscriptions.add(sub);
}
subscriptions.add(sub);
}
}
}

View File

@ -18,7 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test
public void testSingleThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MBassador();
IMessageBus fifoBUs = new MBassador().start();
List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){
@ -53,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test
public void testSingleThreadedSyncAsyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MBassador(1);
IMessageBus fifoBUs = new MBassador(1).start();
List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){

View File

@ -3,7 +3,6 @@ package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.DeadMessage;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;

View File

@ -2,13 +2,13 @@ package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.MessageManager;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.error.IPublicationErrorHandler;
import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.Listeners;
@ -102,7 +102,7 @@ public class MBassadorTest extends MessageBusTest {
}
};
final MBassador bus = new MBassador();
final MBassador bus = new MBassador().start();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -20,7 +20,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSending(){
IMessageBus bus = new MBassador();
IMessageBus bus = new MBassador().start();
Listener listener = new Listener();
bus.subscribe(listener);
@ -31,45 +31,47 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish("s", "s", "s", "s");
bus.publish(1, 2, "s");
bus.publish(1, 2, 3, 4, 5, 6);
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(count.get(), 5);
assertEquals(count.get(), 10);
}
@SuppressWarnings("unused")
public static class Listener {
@Handler
@SuppressWarnings("unused")
public void handleSync(String o1) {
count.getAndIncrement();
System.err.println("match String");
}
@Handler
@SuppressWarnings("unused")
public void handleSync(String o1, String o2) {
count.getAndIncrement();
System.err.println("match String, String");
}
@Handler
@SuppressWarnings("unused")
public void handleSync(String o1, String o2, String o3) {
count.getAndIncrement();
System.err.println("match String, String, String");
}
@Handler
@SuppressWarnings("unused")
public void handleSync(Integer o1, Integer o2, String o3) {
count.getAndIncrement();
}
@Handler(vararg = true)
@SuppressWarnings("unused")
public void handleSync(String... o) {
count.getAndIncrement();
System.err.println("match Integer, Integer, String");
}
@Handler
public void handleSync(String... o) {
count.getAndIncrement();
System.err.println("match String[]");
}
@Handler
@SuppressWarnings("unused")
public void handleSync(Integer... o) {
count.getAndIncrement();
System.err.println("match Integer[]");
}
}
}

View File

@ -2,12 +2,12 @@ package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.error.IPublicationErrorHandler;
import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.MessagesListener;
@ -113,7 +113,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected IMessageBus getSyncMessageBus() {
return new MBassador();
return new MBassador().start();
}
}

View File

@ -2,7 +2,11 @@ package net.engio.mbassy.common;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Run various tests concurrently. A given instance of runnable will be used to spawn and start
@ -16,55 +20,56 @@ import java.util.concurrent.*;
public class ConcurrentExecutor {
public static void runConcurrent(final Runnable unit, int numberOfConcurrentExecutions) {
Runnable[] units = new Runnable[numberOfConcurrentExecutions];
// create the tasks and schedule for execution
for (int i = 0; i < numberOfConcurrentExecutions; i++) {
units[i] = unit;
}
runConcurrent(units);
}
public static void runConcurrent(final Runnable unit, int numberOfConcurrentExecutions) {
Runnable[] units = new Runnable[numberOfConcurrentExecutions];
// create the tasks and schedule for execution
for (int i = 0; i < numberOfConcurrentExecutions; i++) {
units[i] = unit;
}
runConcurrent(units);
}
public static void runConcurrent(int numberOfConcurrentExecutions, final Runnable... units) {
Runnable[] runnables = new Runnable[numberOfConcurrentExecutions * units.length];
// create the tasks and schedule for execution
for (int i = 0; i < numberOfConcurrentExecutions; i++) {
for(int k = 0; k < units.length; k++)
for(int k = 0; k < units.length; k++) {
runnables[k * numberOfConcurrentExecutions +i] = units[k];
}
}
runConcurrent(runnables);
}
public static void runConcurrent(final Runnable... units) {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Long>> returnValues = new ArrayList<Future<Long>>();
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Long>> returnValues = new ArrayList<Future<Long>>();
// create the tasks and schedule for execution
for (final Runnable unit : units) {
Callable<Long> wrapper = new Callable<Long>() {
@Override
public Long call() throws Exception {
long start = System.currentTimeMillis();
unit.run();
return System.currentTimeMillis() - start;
}
};
returnValues.add(executor.submit(wrapper));
}
// create the tasks and schedule for execution
for (final Runnable unit : units) {
Callable<Long> wrapper = new Callable<Long>() {
@Override
public Long call() throws Exception {
long start = System.currentTimeMillis();
unit.run();
return System.currentTimeMillis() - start;
}
};
returnValues.add(executor.submit(wrapper));
}
// wait until all tasks have been executed
try {
executor.shutdown();// tells the thread pool to execute all waiting tasks
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// unlikely that this will happen
e.printStackTrace();
}
// wait until all tasks have been executed
try {
executor.shutdown();// tells the thread pool to execute all waiting tasks
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// unlikely that this will happen
e.printStackTrace();
}
for(Future task : returnValues){
for(Future<?> task : returnValues){
try {
task.get();
} catch (Exception e) {
@ -72,7 +77,7 @@ public class ConcurrentExecutor {
}
}
}
}
}

View File

@ -2,8 +2,8 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.MBassador;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.error.IPublicationErrorHandler;
import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.messages.MessageTypes;
import org.junit.Before;
@ -44,13 +44,13 @@ public abstract class MessageBusTest extends AssertSupport {
public MBassador createBus() {
MBassador bus = new MBassador();
MBassador bus = new MBassador().start();
bus.addErrorHandler(TestFailingHandler);
return bus;
}
public MBassador createBus(ListenerFactory listeners) {
MBassador bus = new MBassador();
MBassador bus = new MBassador().start();
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
return bus;

View File

@ -25,11 +25,11 @@ public class SubscriptionValidator extends AssertSupport{
this.subscribedListener = subscribedListener;
}
public Expectation listener(Class subscriber){
public Expectation listener(Class<?> subscriber) {
return new Expectation(subscriber);
}
private SubscriptionValidator expect(Class subscriber, Class messageType){
private SubscriptionValidator expect(Class<?> subscriber, Class<?> messageType) {
this.validations.add(new ValidationEntry(messageType, subscriber));
this.messageTypes.add(messageType);
return this;
@ -37,8 +37,8 @@ public class SubscriptionValidator extends AssertSupport{
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager){
for(Class messageType : this.messageTypes){
public void validate(SubscriptionManager manager) {
for(Class<?> messageType : this.messageTypes) {
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
Collection<ValidationEntry> validationEntries = getEntries(messageType);
assertEquals(subscriptions.size(), validationEntries.size());
@ -59,7 +59,7 @@ public class SubscriptionValidator extends AssertSupport{
private Collection<ValidationEntry> getEntries(Class messageType){
private Collection<ValidationEntry> getEntries(Class<?> messageType) {
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
for (ValidationEntry validationValidationEntry : this.validations){
if (validationValidationEntry.messageType.equals(messageType)) {
@ -70,16 +70,16 @@ public class SubscriptionValidator extends AssertSupport{
}
public class Expectation{
public class Expectation {
private Class listener;
private Class<?> listener;
private Expectation(Class listener) {
private Expectation(Class<?> listener) {
this.listener = listener;
}
public SubscriptionValidator handles(Class ...messages){
for(Class message : messages) {
public SubscriptionValidator handles(Class<?> ...messages){
for(Class<?> message : messages) {
expect(this.listener, message);
}
return SubscriptionValidator.this;
@ -87,18 +87,12 @@ public class SubscriptionValidator extends AssertSupport{
}
private class ValidationEntry {
private Class<?> subscriber;
private Class<?> messageType;
private Class subscriber;
private Class messageType;
private ValidationEntry(Class messageType, Class subscriber) {
private ValidationEntry(Class<?> messageType, Class<?> subscriber) {
this.messageType = messageType;
this.subscriber = subscriber;
}
}
}