From 113ffb5e4ac36164c5581b27b47491b3b03823ea Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 7 Feb 2015 16:20:34 +0100 Subject: [PATCH] Added remaining arg 1/2/3 methods. Fixed VarArg array construction for calling handlers that are VarArg. WIP disruptor handling --- .../java/net/engio/mbassy/DeadMessage.java | 42 ++ .../java/net/engio/mbassy/IMessageBus.java | 2 +- src/main/java/net/engio/mbassy/MBassador.java | 541 ++++++++++++++++-- .../java/net/engio/mbassy/PubSubSupport.java | 209 ++++--- .../net/engio/mbassy/annotations/Handler.java | 12 - .../mbassy/bus/AbstractPubSubSupport.java | 248 -------- .../net/engio/mbassy/bus/DeadMessage.java | 24 - .../mbassy/disruptor/EventProcessor.java | 51 +- .../engio/mbassy/disruptor/MessageHolder.java | 7 +- .../engio/mbassy/disruptor/MessageType.java | 9 + .../PublicationExceptionHandler.java | 35 ++ .../{bus => }/error/ErrorHandlingSupport.java | 2 +- .../error/IPublicationErrorHandler.java | 2 +- .../{bus => }/error/MessageBusException.java | 4 +- .../{bus => }/error/PublicationError.java | 26 +- .../engio/mbassy/listener/MessageHandler.java | 106 +++- .../mbassy/subscription/Subscription.java | 32 +- .../subscription/SubscriptionManager.java | 56 +- .../net/engio/mbassy/AsyncFIFOBusTest.java | 4 +- .../net/engio/mbassy/DeadMessageTest.java | 1 - .../java/net/engio/mbassy/MBassadorTest.java | 6 +- .../net/engio/mbassy/MultiMessageTest.java | 28 +- .../java/net/engio/mbassy/SyncBusTest.java | 6 +- .../mbassy/common/ConcurrentExecutor.java | 73 +-- .../engio/mbassy/common/MessageBusTest.java | 8 +- .../mbassy/common/SubscriptionValidator.java | 32 +- 26 files changed, 1001 insertions(+), 565 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/DeadMessage.java delete mode 100644 src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java delete mode 100644 src/main/java/net/engio/mbassy/bus/DeadMessage.java create mode 100644 src/main/java/net/engio/mbassy/disruptor/MessageType.java create mode 100644 src/main/java/net/engio/mbassy/disruptor/PublicationExceptionHandler.java rename src/main/java/net/engio/mbassy/{bus => }/error/ErrorHandlingSupport.java (94%) rename src/main/java/net/engio/mbassy/{bus => }/error/IPublicationErrorHandler.java (97%) rename src/main/java/net/engio/mbassy/{bus => }/error/MessageBusException.java (93%) rename src/main/java/net/engio/mbassy/{bus => }/error/PublicationError.java (74%) diff --git a/src/main/java/net/engio/mbassy/DeadMessage.java b/src/main/java/net/engio/mbassy/DeadMessage.java new file mode 100644 index 0000000..659675f --- /dev/null +++ b/src/main/java/net/engio/mbassy/DeadMessage.java @@ -0,0 +1,42 @@ +package net.engio.mbassy; + +/** + * The dead message event is published whenever no message + * handlers could be found for a given message publication. + * + * @author bennidi + * Date: 1/18/13 + * @author dorkbox, llc + * Date: 2/2/15 + */ +final class DeadMessage { + + private Object[] relatedMessages; + + + DeadMessage(Object message) { + this.relatedMessages = new Object[1]; + this.relatedMessages[0] = message; + } + + DeadMessage(Object message1, Object message2) { + this.relatedMessages = new Object[2]; + this.relatedMessages[0] = message1; + this.relatedMessages[1] = message2; + } + + DeadMessage(Object message1, Object message2, Object message3 ) { + this.relatedMessages = new Object[3]; + this.relatedMessages[0] = message1; + this.relatedMessages[1] = message2; + this.relatedMessages[2] = message3; + } + + DeadMessage(Object[] messages) { + this.relatedMessages = messages; + } + + public Object[] getMessages() { + return this.relatedMessages; + } +} diff --git a/src/main/java/net/engio/mbassy/IMessageBus.java b/src/main/java/net/engio/mbassy/IMessageBus.java index cbfd76c..6d3a72c 100644 --- a/src/main/java/net/engio/mbassy/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/IMessageBus.java @@ -1,6 +1,6 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.error.ErrorHandlingSupport; +import net.engio.mbassy.error.ErrorHandlingSupport; /** * A message bus offers facilities for publishing messages to the message handlers of registered listeners. diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java index eee7caa..40e2f4a 100644 --- a/src/main/java/net/engio/mbassy/MBassador.java +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -1,19 +1,29 @@ package net.engio.mbassy; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import net.engio.mbassy.bus.AbstractPubSubSupport; -import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.DisruptorThreadFactory; import net.engio.mbassy.disruptor.EventBusFactory; import net.engio.mbassy.disruptor.EventProcessor; import net.engio.mbassy.disruptor.MessageHolder; +import net.engio.mbassy.disruptor.MessageType; +import net.engio.mbassy.disruptor.PublicationExceptionHandler; +import net.engio.mbassy.error.IPublicationErrorHandler; +import net.engio.mbassy.error.PublicationError; +import net.engio.mbassy.subscription.Subscription; +import net.engio.mbassy.subscription.SubscriptionManager; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.WorkHandler; +import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -24,7 +34,13 @@ import com.lmax.disruptor.dsl.ProducerType; * @author dorkbox, llc * Date: 2/2/15 */ -public class MBassador extends AbstractPubSubSupport implements IMessageBus { +public class MBassador implements IMessageBus { + + // error handling is first-class functionality + // this handler will receive all errors that occur during message dispatch or message handling + private final List errorHandlers = new ArrayList(); + + private final SubscriptionManager subscriptionManager; // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down private final ExecutorService executor = Executors.newCachedThreadPool(new DisruptorThreadFactory()); @@ -42,66 +58,351 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { public MBassador(int numberOfThreads) { - super(); - if (numberOfThreads < 1) { numberOfThreads = 1; // at LEAST 1 thread. } + this.subscriptionManager = new SubscriptionManager(); EventBusFactory factory = new EventBusFactory(); - EventProcessor procs[] = new EventProcessor[numberOfThreads]; - for (int i = 0; i < procs.length; i++) { - procs[i] = new EventProcessor(this, i, procs.length); - } + PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); this.disruptor = new Disruptor(factory, this.ringBufferSize, this.executor, ProducerType.MULTI, new SleepingWaitStrategy()); + this.ringBuffer = this.disruptor.getRingBuffer(); - // tell the disruptor to handle procs first - this.disruptor.handleEventsWith(procs); - this.ringBuffer = this.disruptor.start(); + WorkHandler handlers[] = new EventProcessor[numberOfThreads]; + for (int i = 0; i < handlers.length; i++) { + handlers[i] = new EventProcessor(this); + } + + WorkerPool workerPool = new WorkerPool(this.ringBuffer, + this.ringBuffer.newBarrier(), + loggingExceptionHandler, + handlers); + + this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); + } + + public final MBassador start() { +// workerPool.start(this.executor); ?? + this.disruptor.start(); + return this; + } + + + @Override + public final void addErrorHandler(IPublicationErrorHandler handler) { + synchronized (this.errorHandlers) { + this.errorHandlers.add(handler); + } + } + + @Override + public final void handlePublicationError(PublicationError error) { + for (IPublicationErrorHandler errorHandler : this.errorHandlers) { + errorHandler.handleError(error); + } + } + + @Override + public boolean unsubscribe(Object listener) { + return this.subscriptionManager.unsubscribe(listener); + } + + + @Override + public void subscribe(Object listener) { + this.subscriptionManager.subscribe(listener); + } + + @Override + public boolean hasPendingMessages() { + return this.ringBuffer.remainingCapacity() != this.ringBufferSize; + } + + @Override + public void shutdown() { + this.disruptor.shutdown(); + this.executor.shutdown(); } @Override public void publish(Object message) { try { - publishMessage(message); + Class messageClass = message.getClass(); + + SubscriptionManager manager = this.subscriptionManager; + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + + if (subscriptions == null || subscriptions.isEmpty()) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } else { + Object[] vararg = null; + + for (Subscription sub : subscriptions) { + boolean handled = false; + if (sub.isVarArg()) { + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; + + Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); + newInstance[0] = vararg; + vararg = newInstance; + } + + handled = true; + sub.publishToSubscription(this, vararg); + } + + if (!handled) { + sub.publishToSubscription(this, message); + } + } + + // if the message did not have any listener/handler accept it + if (subscriptions.isEmpty()) { + if (!DeadMessage.class.equals(messageClass.getClass())) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } + } + } } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") .setCause(e) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); } } @Override public void publish(Object message1, Object message2) { try { - publishMessage(message1, message2); + Class messageClass1 = message1.getClass(); + Class messageClass2 = message2.getClass(); + + SubscriptionManager manager = this.subscriptionManager; + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); + + if (subscriptions == null || subscriptions.isEmpty()) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message1, message2); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } else { + Object[] vararg = null; + + for (Subscription sub : subscriptions) { + boolean handled = false; + if (sub.isVarArg()) { + Class class1 = message1.getClass(); + Class class2 = message2.getClass(); + if (!class1.isArray() && class1 == class2) { + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(class1, 2); + vararg[0] = message1; + vararg[1] = message2; + + Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); + newInstance[0] = vararg; + vararg = newInstance; + } + + handled = true; + sub.publishToSubscription(this, vararg); + } + } + + if (!handled) { + sub.publishToSubscription(this, message1, message2); + } + } + + // if the message did not have any listener/handler accept it + if (subscriptions.isEmpty()) { + // cannot have DeadMessage published to this, so no extra check necessary + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(message1, message2); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } + } } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") .setCause(e) - .setPublishedObject(new Object[] {message1, message2})); + .setPublishedObject(message1, message2)); } } @Override public void publish(Object message1, Object message2, Object message3) { try { - publishMessage(message1, message2, message3); + Class messageClass1 = message1.getClass(); + Class messageClass2 = message2.getClass(); + Class messageClass3 = message3.getClass(); + + SubscriptionManager manager = this.subscriptionManager; + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); + + if (subscriptions == null || subscriptions.isEmpty()) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } else { + Object[] vararg = null; + + for (Subscription sub : subscriptions) { + boolean handled = false; + if (sub.isVarArg()) { + Class class1 = message1.getClass(); + Class class2 = message2.getClass(); + Class class3 = message3.getClass(); + if (!class1.isArray() && class1 == class2 && class2 == class3) { + // messy, but the ONLY way to do it. + if (vararg == null) { + vararg = (Object[]) Array.newInstance(class1, 3); + vararg[0] = message1; + vararg[1] = message2; + vararg[2] = message3; + + Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); + newInstance[0] = vararg; + vararg = newInstance; + } + + handled = true; + sub.publishToSubscription(this, vararg); + } + } + + if (!handled) { + sub.publishToSubscription(this, message1, message2, message3); + } + } + + // if the message did not have any listener/handler accept it + if (subscriptions.isEmpty()) { + // cannot have DeadMessage published to this, so no extra check necessary + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + + // cleanup + deadMessage = null; + } + } } catch (Throwable e) { handlePublicationError(new PublicationError() - .setMessage("Error during publication of message") - .setCause(e) - .setPublishedObject(new Object[] {message1, message2, message3})); + .setMessage("Error during publication of message") + .setCause(e) + .setPublishedObject(message1, message2, message3)); } } @Override public void publish(Object... messages) { try { - publishMessage(messages); + // cannot have DeadMessage published to this! + int size = messages.length; + boolean allSameType = true; + + Class[] messageClasses = new Class[size]; + Class first = null; + if (size > 0) { + first = messageClasses[0] = messages[0].getClass(); + } + + for (int i=1;i subscriptions = manager.getSubscriptionsByMessageType(messageClasses); + + if (subscriptions == null || subscriptions.isEmpty()) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(messages); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } else { + Object[] vararg = null; + + for (Subscription sub : subscriptions) { + boolean handled = false; + if (first != null && allSameType && sub.isVarArg()) { + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(first, size); + + for (int i=0;i ringBuffer = this.ringBuffer; + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.TWO; + eventJob.message1 = message1; + eventJob.message2 = message2; } catch (Exception e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") .setCause(e) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message1, message2)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + } + + @Override + public void publishAsync(Object message1, Object message2, Object message3) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.THREE; + eventJob.message1 = message1; + eventJob.message2 = message2; + eventJob.message3 = message3; + } catch (Exception e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(new Object[] {message1, message2, message3})); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + } + + @Override + public void publishAsync(Object... messages) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.ARRAY; + eventJob.messages = messages; + } catch (Exception e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(messages)); } finally { // always publish the job ringBuffer.publish(seq); @@ -150,7 +516,7 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") .setCause(new Exception("Timeout")) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); return; } } @@ -159,12 +525,86 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { final long seq = ringBuffer.next(); try { MessageHolder eventJob = ringBuffer.get(seq); - eventJob.message = message; + eventJob.messageType = MessageType.ONE; + eventJob.message1 = message; } catch (Exception e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") .setCause(e) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + } + @Override + public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); + + // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space + // to become available. + while (!ringBuffer.hasAvailableCapacity(1)) { + LockSupport.parkNanos(10L); + if (expireTimestamp <= System.currentTimeMillis()) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(new Exception("Timeout")) + .setPublishedObject(message1, message2)); + return; + } + } + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.TWO; + eventJob.message1 = message1; + eventJob.message2 = message2; + } catch (Exception e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + } + @Override + public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); + + // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space + // to become available. + while (!ringBuffer.hasAvailableCapacity(1)) { + LockSupport.parkNanos(10L); + if (expireTimestamp <= System.currentTimeMillis()) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(new Exception("Timeout")) + .setPublishedObject(message1, message2, message3)); + return; + } + } + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.THREE; + eventJob.message1 = message1; + eventJob.message2 = message2; + eventJob.message3 = message3; + } catch (Exception e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2, message3)); } finally { // always publish the job ringBuffer.publish(seq); @@ -172,13 +612,38 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { } @Override - public boolean hasPendingMessages() { - return this.ringBuffer.remainingCapacity() != this.ringBufferSize; - } + public void publishAsync(long timeout, TimeUnit unit, Object... messages) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); - @Override - public void shutdown() { - this.disruptor.shutdown(); - this.executor.shutdown(); + // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space + // to become available. + while (!ringBuffer.hasAvailableCapacity(1)) { + LockSupport.parkNanos(10L); + if (expireTimestamp <= System.currentTimeMillis()) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(new Exception("Timeout")) + .setPublishedObject(messages)); + return; + } + } + + // setup the job + final long seq = ringBuffer.next(); + try { + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.ARRAY; + eventJob.messages = messages; + } catch (Exception e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(messages)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } } } diff --git a/src/main/java/net/engio/mbassy/PubSubSupport.java b/src/main/java/net/engio/mbassy/PubSubSupport.java index aca2aa0..01e234f 100644 --- a/src/main/java/net/engio/mbassy/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/PubSubSupport.java @@ -50,115 +50,160 @@ public interface PubSubSupport { void publish(Object message1, Object message2); /** - * Synchronously publish THREE messages to all registered listeners (that match the signature). This includes listeners - * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call - * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. + * Synchronously publish THREE messages to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have + * been notified (invoked) of the message. */ void publish(Object message1, Object message2, Object message3); /** - * Synchronously publish ARBITRARY messages to all registered listeners (that match the signature). This includes listeners - * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call - * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. + * Synchronously publish ARBITRARY or ARRAY messages to all registered listeners (that match the + * signature). This includes listeners defined for super types of the given message type, provided they + * are not configured to reject valid subtypes. The call returns when all matching handlers of all + * registered listeners have been notified (invoked) of the message. + *

+ *

+ *

+ * + * Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an + * array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match! + * */ void publish(Object... messages); /** - * Publish the message asynchronously to all registered listeners (that match the signature). This includes listeners - * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call - * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. + * Publish the message asynchronously to all registered listeners (that match the signature). This includes + * listeners defined for super types of the given message type, provided they are not configured to reject + * valid subtypes. The call returns when all matching handlers of all registered listeners have been notified + * (invoked) of the message. + *

*

* The behavior of this method depends on the configured queuing strategy: *

+ *

* If an unbound queuing strategy is used the call returns immediately. * If a bounded queue is used the call might block until the message can be placed in the queue. */ void publishAsync(Object message); -// /** -// * Publish TWO messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue. -// */ -// void publishAsync(Object message1, Object message2); -// -// /** -// * Publish THREE messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue. -// */ -// void publishAsync(Object message1, Object message2, Object message3); -// -// /** -// * Publish ARBITRARY messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue. -// */ -// void publishAsync(Object... messages); + /** + * Publish TWO messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have + * been notified (invoked) of the message. + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + */ + void publishAsync(Object message1, Object message2); + + /** + * Publish THREE messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured to + * reject valid subtypes. The call returns when all matching handlers of all registered listeners have been + * notified (invoked) of the message. + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + */ + void publishAsync(Object message1, Object message2, Object message3); + + /** + * Publish ARBITRARY messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured to reject + * valid subtypes. The call returns when all matching handlers of all registered listeners have been notified + * (invoked) of the message. + *

+ *

+ * + * Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an + * array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match! + * + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + */ + void publishAsync(Object... messages); /** - * Publish the message asynchronously to all registered listeners (that match the signature). This includes listeners - * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call - * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. + * Publish the message asynchronously to all registered listeners (that match the signature). This includes + * listeners defined for super types of the given message type, provided they are not configured to reject + * valid subtypes. The call returns when all matching handlers of all registered listeners have been notified + * (invoked) of the message. + *

*

* The behavior of this method depends on the configured queuing strategy: *

+ *

* If an unbound queuing strategy is used the call returns immediately. * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. */ void publishAsync(long timeout, TimeUnit unit, Object message); -// /** -// * Publish TWO messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. -// */ -// void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2); -// -// /** -// * Publish THREE messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. -// */ -// void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3); -// -// /** -// * Publish ARBITRARY messages asynchronously to all registered listeners (that match the signature). This includes listeners -// * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call -// * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. -// *

-// * The behavior of this method depends on the configured queuing strategy: -// *

-// * If an unbound queuing strategy is used the call returns immediately. -// * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. -// */ -// void publishAsync(long timeout, TimeUnit unit, Object... messages); + /** + * Publish TWO messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have + * been notified (invoked) of the message. + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. + */ + void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2); + + /** + * Publish THREE messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured to + * reject valid subtypes. The call returns when all matching handlers of all registered listeners have been + * notified (invoked) of the message. + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. + */ + void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3); + + /** + * Publish ARBITRARY or ARRAY messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured to reject + * valid subtypes. The call returns when all matching handlers of all registered listeners have been notified + * (invoked) of the message. + *

+ *

+ * + * Note, that there is NO DIFFERENCE in the java language between VarArg and Array! If it's an + * array, IT IS ALSO VARARG. You must explicitly check arrays to make sure they match! + * + *

+ *

+ * The behavior of this method depends on the configured queuing strategy: + *

+ *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue or the timeout is reached. + */ + void publishAsync(long timeout, TimeUnit unit, Object... messages); } diff --git a/src/main/java/net/engio/mbassy/annotations/Handler.java b/src/main/java/net/engio/mbassy/annotations/Handler.java index 97c1095..c740b11 100644 --- a/src/main/java/net/engio/mbassy/annotations/Handler.java +++ b/src/main/java/net/engio/mbassy/annotations/Handler.java @@ -12,8 +12,6 @@ import java.lang.annotation.Target; * * @author bennidi * Date: 2/8/12 - * @author dorkbox, llc - * Date: 2/2/15 */ @Retention(value = RetentionPolicy.RUNTIME) @Inherited @@ -32,14 +30,4 @@ public @interface Handler { * handlers that have been declared by a superclass but do not apply to the subclass */ boolean enabled() default true; - - /** - * Var-Arg. Should this handler accept variable arguments? - *

- * IE: should foo get dispatched to a handler registered as: blah(String... args){} - *

- *

- * Normally the only message to be received would be new String[]{"boo", "bar"} - */ - boolean vararg() default false; } diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java deleted file mode 100644 index 108220a..0000000 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ /dev/null @@ -1,248 +0,0 @@ -package net.engio.mbassy.bus; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import net.engio.mbassy.PubSubSupport; -import net.engio.mbassy.bus.error.ErrorHandlingSupport; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.subscription.Subscription; -import net.engio.mbassy.subscription.SubscriptionManager; - -/** - * The base class for all message bus implementations. - * @author bennidi - * @author dorkbox, llc - * Date: 2/2/15 - */ -public abstract class AbstractPubSubSupport implements PubSubSupport, ErrorHandlingSupport { - - // error handling is first-class functionality - // this handler will receive all errors that occur during message dispatch or message handling - private final List errorHandlers = new ArrayList(); - - private final SubscriptionManager subscriptionManager; - - - public AbstractPubSubSupport() { - this.subscriptionManager = new SubscriptionManager(); - } - - @Override - public final void handlePublicationError(PublicationError error) { - for (IPublicationErrorHandler errorHandler : this.errorHandlers) { - errorHandler.handleError(error); - } - } - - @Override - public boolean unsubscribe(Object listener) { - return this.subscriptionManager.unsubscribe(listener); - } - - - @Override - public void subscribe(Object listener) { - this.subscriptionManager.subscribe(listener); - } - - - @Override - public final void addErrorHandler(IPublicationErrorHandler handler) { - synchronized (this.errorHandlers) { - this.errorHandlers.add(handler); - } - } - - public void publishMessage(Object message) { - Class messageClass = message.getClass(); - - SubscriptionManager manager = this.subscriptionManager; - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - - if (subscriptions == null || subscriptions.isEmpty()) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } else { - for (Subscription sub : subscriptions) { - Object msg = message; - if (sub.isVarArg()) { - if (!message.getClass().isArray()) { - // messy, but the ONLY way to do it. - Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; - msg = vararg; - } - } - sub.publishToSubscription(this, msg); - } - - // if the message did not have any listener/handler accept it - if (subscriptions.isEmpty()) { - if (!DeadMessage.class.equals(messageClass.getClass())) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } - } - } - } - - // cannot have DeadMessage published to this! - public void publishMessage(Object message1, Object message2) { - Class messageClass1 = message1.getClass(); - Class messageClass2 = message2.getClass(); - - SubscriptionManager manager = this.subscriptionManager; - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); - - if (subscriptions == null || subscriptions.isEmpty()) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } else { - for (Subscription sub : subscriptions) { - boolean handled = false; - if (sub.isVarArg()) { - Class class1 = message1.getClass(); - Class class2 = message2.getClass(); - if (!class1.isArray() && class1 == class2) { - // messy, but the ONLY way to do it. - Object[] vararg = (Object[]) Array.newInstance(class1.getClass(), 2); - vararg[0] = message1; - vararg[1] = message2; - - handled = true; - sub.publishToSubscription(this, vararg); - } - } - - if (!handled) { - sub.publishToSubscription(this, message1, message2); - } - } - - // if the message did not have any listener/handler accept it - if (subscriptions.isEmpty()) { - // cannot have DeadMessage published to this, so no extra check necessary - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } - } - } - - // cannot have DeadMessage published to this! - public void publishMessage(Object message1, Object message2, Object message3) { - Class messageClass1 = message1.getClass(); - Class messageClass2 = message2.getClass(); - Class messageClass3 = message3.getClass(); - - SubscriptionManager manager = this.subscriptionManager; - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); - - if (subscriptions == null || subscriptions.isEmpty()) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2, message3}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } else { - for (Subscription sub : subscriptions) { - boolean handled = false; - if (sub.isVarArg()) { - Class class1 = message1.getClass(); - Class class2 = message2.getClass(); - Class class3 = message3.getClass(); - if (!class1.isArray() && class1 == class2 && class2 == class3) { - // messy, but the ONLY way to do it. - Object[] vararg = (Object[]) Array.newInstance(class1.getClass(), 3); - vararg[0] = message1; - vararg[1] = message2; - vararg[2] = message3; - - handled = true; - sub.publishToSubscription(this, vararg); - } - } - - if (!handled) { - sub.publishToSubscription(this, message1, message2, message3); - } - } - - // if the message did not have any listener/handler accept it - if (subscriptions.isEmpty()) { - // cannot have DeadMessage published to this, so no extra check necessary - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(new Object[] {message1, message2}); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } - } - } - - // cannot have DeadMessage published to this! - public void publishMessage(Object... messages) { - int size = messages.length; - Class[] messageClasses = new Class[size]; - for (int i=0;i subscriptions = manager.getSubscriptionsByMessageType(messageClasses); - - if (subscriptions == null || subscriptions.isEmpty()) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(messages); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } else { - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, messages); - } - - // if the message did not have any listener/handler accept it - if (subscriptions.isEmpty()) { - // cannot have DeadMessage published to this, so no extra check necessary - // Dead Event - - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(messages); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } - } - } - } -} diff --git a/src/main/java/net/engio/mbassy/bus/DeadMessage.java b/src/main/java/net/engio/mbassy/bus/DeadMessage.java deleted file mode 100644 index 67b51ac..0000000 --- a/src/main/java/net/engio/mbassy/bus/DeadMessage.java +++ /dev/null @@ -1,24 +0,0 @@ -package net.engio.mbassy.bus; - -/** - * The dead message event is published whenever no message - * handlers could be found for a given message publication. - * - * @author bennidi - * Date: 1/18/13 - * @author dorkbox, llc - * Date: 2/2/15 - */ -public final class DeadMessage { - - private Object[] relatedMessages; - - - DeadMessage(Object[] messages) { - this.relatedMessages = messages; - } - - public Object[] getMessages() { - return this.relatedMessages; - } -} diff --git a/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java b/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java index 869454b..c8e7dbc 100644 --- a/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java +++ b/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java @@ -1,38 +1,47 @@ package net.engio.mbassy.disruptor; -import net.engio.mbassy.bus.AbstractPubSubSupport; -import net.engio.mbassy.bus.error.PublicationError; +import net.engio.mbassy.PubSubSupport; -import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.WorkHandler; /** * @author dorkbox, llc * Date: 2/2/15 */ -public class EventProcessor implements EventHandler { - private final AbstractPubSubSupport publisher; +public class EventProcessor implements WorkHandler { + private final PubSubSupport publisher; - private final long ordinal; - private final long numberOfConsumers; - - public EventProcessor(AbstractPubSubSupport publisher, final long ordinal, final long numberOfConsumers) { + public EventProcessor(PubSubSupport publisher) { this.publisher = publisher; - this.ordinal = ordinal; - this.numberOfConsumers = numberOfConsumers; } @Override - public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { - if (sequence % this.numberOfConsumers == this.ordinal) { - try { - this.publisher.publishMessage(event.message); - } catch (Throwable t) { - this.publisher.handlePublicationError(new PublicationError() - .setMessage("Error in asynchronous dispatch") - .setCause(t) - .setPublishedObject(new Object[] {event.message})); + public void onEvent(MessageHolder event) throws Exception { + MessageType messageType = event.messageType; + switch (messageType) { + case ONE: { + this.publisher.publish(event.message1); + event.message1 = null; // cleanup + return; + } + case TWO: { + this.publisher.publish(event.message1, event.message2); + event.message1 = null; // cleanup + event.message2 = null; // cleanup + return; + } + case THREE: { + this.publisher.publish(event.message1, event.message2, event.message3); + event.message1 = null; // cleanup + event.message2 = null; // cleanup + event.message3 = null; // cleanup + return; + } + case ARRAY: { + this.publisher.publish(event.messages); + event.messages = null; // cleanup + return; } - event.message = null; // cleanup } } } \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java b/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java index bb95735..8b2e3cc 100644 --- a/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java +++ b/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java @@ -5,7 +5,12 @@ package net.engio.mbassy.disruptor; * Date: 2/2/15 */ public class MessageHolder { - public Object message; + public MessageType messageType = MessageType.ONE; + + public Object message1 = null; + public Object message2 = null; + public Object message3 = null; + public Object[] messages = null; public MessageHolder() { } diff --git a/src/main/java/net/engio/mbassy/disruptor/MessageType.java b/src/main/java/net/engio/mbassy/disruptor/MessageType.java new file mode 100644 index 0000000..40141ef --- /dev/null +++ b/src/main/java/net/engio/mbassy/disruptor/MessageType.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.disruptor; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public enum MessageType { + ONE, TWO, THREE, ARRAY +} diff --git a/src/main/java/net/engio/mbassy/disruptor/PublicationExceptionHandler.java b/src/main/java/net/engio/mbassy/disruptor/PublicationExceptionHandler.java new file mode 100644 index 0000000..08d4bd9 --- /dev/null +++ b/src/main/java/net/engio/mbassy/disruptor/PublicationExceptionHandler.java @@ -0,0 +1,35 @@ +package net.engio.mbassy.disruptor; + +import net.engio.mbassy.error.ErrorHandlingSupport; +import net.engio.mbassy.error.PublicationError; + +import com.lmax.disruptor.ExceptionHandler; + +public final class PublicationExceptionHandler implements ExceptionHandler { + private final ErrorHandlingSupport errorHandler; + + public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) { + this.errorHandler = errorHandler; + } + + @Override + public void handleEventException(final Throwable e, final long sequence, final Object event) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")") + .setCause(e)); + } + + @Override + public void handleOnStartException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error starting the disruptor") + .setCause(e)); + } + + @Override + public void handleOnShutdownException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error stopping the disruptor") + .setCause(e)); + } +} diff --git a/src/main/java/net/engio/mbassy/bus/error/ErrorHandlingSupport.java b/src/main/java/net/engio/mbassy/error/ErrorHandlingSupport.java similarity index 94% rename from src/main/java/net/engio/mbassy/bus/error/ErrorHandlingSupport.java rename to src/main/java/net/engio/mbassy/error/ErrorHandlingSupport.java index 1b5677c..3f49421 100644 --- a/src/main/java/net/engio/mbassy/bus/error/ErrorHandlingSupport.java +++ b/src/main/java/net/engio/mbassy/error/ErrorHandlingSupport.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus.error; +package net.engio.mbassy.error; /** * @author bennidi diff --git a/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java b/src/main/java/net/engio/mbassy/error/IPublicationErrorHandler.java similarity index 97% rename from src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java rename to src/main/java/net/engio/mbassy/error/IPublicationErrorHandler.java index 92bc176..2fbe526 100644 --- a/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java +++ b/src/main/java/net/engio/mbassy/error/IPublicationErrorHandler.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus.error; +package net.engio.mbassy.error; /** * Publication error handlers are provided with a publication error every time an diff --git a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java b/src/main/java/net/engio/mbassy/error/MessageBusException.java similarity index 93% rename from src/main/java/net/engio/mbassy/bus/error/MessageBusException.java rename to src/main/java/net/engio/mbassy/error/MessageBusException.java index ffde43c..9e69c3b 100644 --- a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java +++ b/src/main/java/net/engio/mbassy/error/MessageBusException.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus.error; +package net.engio.mbassy.error; /** * The universal exception type for message bus implementations. @@ -23,6 +23,4 @@ public class MessageBusException extends Exception { public MessageBusException(Throwable cause) { super(cause); } - - } diff --git a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java b/src/main/java/net/engio/mbassy/error/PublicationError.java similarity index 74% rename from src/main/java/net/engio/mbassy/bus/error/PublicationError.java rename to src/main/java/net/engio/mbassy/error/PublicationError.java index 70943f9..7765896 100644 --- a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java +++ b/src/main/java/net/engio/mbassy/error/PublicationError.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus.error; +package net.engio.mbassy.error; import java.lang.reflect.Method; import java.util.Arrays; @@ -81,6 +81,30 @@ public class PublicationError { return this.publishedObjects; } + public PublicationError setPublishedObject(Object publishedObject) { + this.publishedObjects = new Object[1]; + this.publishedObjects[0] = publishedObject; + + return this; + } + + public PublicationError setPublishedObject(Object publishedObject1, Object publishedObject2) { + this.publishedObjects = new Object[2]; + this.publishedObjects[0] = publishedObject1; + this.publishedObjects[1] = publishedObject2; + + return this; + } + + public PublicationError setPublishedObject(Object publishedObject1, Object publishedObject2, Object publishedObject3) { + this.publishedObjects = new Object[3]; + this.publishedObjects[0] = publishedObject1; + this.publishedObjects[1] = publishedObject2; + this.publishedObjects[2] = publishedObject3; + + return this; + } + public PublicationError setPublishedObject(Object[] publishedObjects) { this.publishedObjects = publishedObjects; return this; diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/listener/MessageHandler.java index d915a80..9637d29 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandler.java @@ -11,6 +11,15 @@ import net.engio.mbassy.common.ReflectionUtils; * Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains * the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy * defines such a message listener. + *

+ *

+ * Note: When sending messages to a handler that is of type ARRAY (either an object of type array, or a vararg), the JVM cannot + * tell the difference (the message that is being sent), if it is a vararg or array. + *

+ *

+ * BECAUSE OF THIS, we always treat the two the same + *

+ *

* * @author bennidi * Date: 11/14/12 @@ -24,6 +33,7 @@ public class MessageHandler { private final boolean acceptsSubtypes; private final MessageListener listenerConfig; + // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! private final boolean acceptsVarArg; private final boolean isSynchronized; @@ -40,9 +50,12 @@ public class MessageHandler { this.handler = handler; this.acceptsSubtypes = !handlerConfig.rejectSubtypes(); this.listenerConfig = listenerMetadata; - this.acceptsVarArg = handlerConfig.vararg(); this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null; this.handledMessages = handledMessages; + + + // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! + this.acceptsVarArg = handledMessages.length == 1 && handledMessages[0].isArray(); } public A getAnnotation(Class annotationType){ @@ -69,6 +82,97 @@ public class MessageHandler { * @author dorkbox, llc * Date: 2/2/15 */ + + /** + * @return true if the message types are handled + */ + public boolean handlesMessage(Class messageType) { + Class[] handledMessages = this.handledMessages; + int handledLength = handledMessages.length; + + if (handledLength != 1) { + return false; + } + + if (this.acceptsSubtypes) { + if (!handledMessages[0].isAssignableFrom(messageType)) { + return false; + } + } else { + if (handledMessages[0] != messageType) { + return false; + } + } + + return true; + } + + /** + * @return true if the message types are handled + */ + public boolean handlesMessage(Class messageType1, Class messageType2) { + Class[] handledMessages = this.handledMessages; + int handledLength = handledMessages.length; + + if (handledLength != 2) { + return false; + } + + if (this.acceptsSubtypes) { + if (!handledMessages[0].isAssignableFrom(messageType1)) { + return false; + } + if (!handledMessages[1].isAssignableFrom(messageType2)) { + return false; + } + } else { + if (handledMessages[0] != messageType1) { + return false; + } + if (handledMessages[1] != messageType2) { + return false; + } + } + + return true; + } + + /** + * @return true if the message types are handled + */ + public boolean handlesMessage(Class messageType1, Class messageType2, Class messageType3) { + Class[] handledMessages = this.handledMessages; + int handledLength = handledMessages.length; + + if (handledLength != 3) { + return false; + } + + if (this.acceptsSubtypes) { + if (!handledMessages[0].isAssignableFrom(messageType1)) { + return false; + } + if (!handledMessages[1].isAssignableFrom(messageType2)) { + return false; + } + if (!handledMessages[2].isAssignableFrom(messageType3)) { + return false; + } + } else { + if (handledMessages[0] != messageType1) { + return false; + } + if (handledMessages[1] != messageType2) { + return false; + } + if (handledMessages[2] != messageType3) { + return false; + } + } + + return true; + } + /** * @return true if the message types are handled */ diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index 5a13ff4..d15dd0b 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -4,10 +4,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; -import net.engio.mbassy.bus.error.ErrorHandlingSupport; -import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.dispatch.IHandlerInvocation; +import net.engio.mbassy.error.ErrorHandlingSupport; +import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.listener.MessageHandler; /** @@ -132,7 +132,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); } catch (IllegalArgumentException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -141,7 +141,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); } catch (InvocationTargetException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -149,8 +149,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message})); - + .setPublishedObject(message)); } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -158,7 +157,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message})); + .setPublishedObject(message)); } } } @@ -188,7 +187,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2})); + .setPublishedObject(message1, message2)); } catch (IllegalArgumentException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -201,7 +200,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2})); + .setPublishedObject(message1, message2)); } catch (InvocationTargetException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -209,8 +208,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2})); - + .setPublishedObject(message1, message2)); } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -218,7 +216,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2})); + .setPublishedObject(message1, message2)); } } } @@ -248,7 +246,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2, message3})); + .setPublishedObject(message1, message2, message3)); } catch (IllegalArgumentException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -263,7 +261,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2, message3})); + .setPublishedObject(message1, message2, message3)); } catch (InvocationTargetException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -271,8 +269,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2, message3})); - + .setPublishedObject(message1, message2, message3)); } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -280,7 +277,7 @@ public class Subscription { .setCause(e) .setHandler(handler) .setListener(listener) - .setPublishedObject(new Object[] {message1, message2, message3})); + .setPublishedObject(message1, message2, message3)); } } } @@ -328,7 +325,6 @@ public class Subscription { .setHandler(handler) .setListener(listener) .setPublishedObject(messages)); - } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java index c7dfc94..903f293 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java @@ -11,13 +11,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -import net.engio.mbassy.bus.error.MessageBusException; import net.engio.mbassy.common.ObjectTree; import net.engio.mbassy.common.ReflectionUtils; import net.engio.mbassy.common.WeakConcurrentSet; import net.engio.mbassy.dispatch.IHandlerInvocation; import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation; +import net.engio.mbassy.error.MessageBusException; import net.engio.mbassy.listener.MessageHandler; import net.engio.mbassy.listener.MetadataReader; @@ -225,7 +225,7 @@ public class SubscriptionManager { Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { + if (sub.handlesMessageType(eventSuperType)) { subscriptions.add(sub); } } @@ -235,20 +235,17 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - // tricky part. We have to check the ARRAY version + // tricky part. We have to check the ARRAY version, for (Class eventSuperType : types1) { - if (!eventSuperType.isArray()) { - // messy, but the ONLY way to do it. - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - } + // messy, but the ONLY way to do it. + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.isVarArg() && sub.handlesMessageType(messageType)) { - subscriptions.add(sub); - } + subscriptions.add(sub); } } } @@ -286,7 +283,7 @@ public class SubscriptionManager { Collection subs = leaf2.getValue(); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType1, messageType2)) { + if (sub.handlesMessageType(eventSuperType1, eventSuperType2)) { subscriptions.add(sub); } } @@ -302,18 +299,15 @@ public class SubscriptionManager { if (messageType1 == messageType2) { // tricky part. We have to check the ARRAY version for (Class eventSuperType : types1) { - if (!eventSuperType.isArray()) { - // messy, but the ONLY way to do it. - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - } + // messy, but the ONLY way to do it. + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.isVarArg() && sub.handlesMessageType(messageType1)) { - subscriptions.add(sub); - } + subscriptions.add(sub); } } } @@ -360,7 +354,7 @@ public class SubscriptionManager { Collection subs = leaf3.getValue(); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType1, messageType2, messageType3)) { + if (sub.handlesMessageType(eventSuperType1, eventSuperType2, eventSuperType3)) { subscriptions.add(sub); } } @@ -378,18 +372,15 @@ public class SubscriptionManager { if (messageType1 == messageType2 && messageType2 == messageType3) { // tricky part. We have to check the ARRAY version for (Class eventSuperType : types1) { - if (!eventSuperType.isArray()) { - // messy, but the ONLY way to do it. - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - } + // messy, but the ONLY way to do it. + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.isVarArg() && sub.handlesMessageType(messageType1)) { - subscriptions.add(sub); - } + subscriptions.add(sub); } } } @@ -431,7 +422,7 @@ public class SubscriptionManager { } - // add all subscriptions that match super types + // add all subscriptions that match super types combinations // have to use recursion for this. BLEH getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes); @@ -443,18 +434,15 @@ public class SubscriptionManager { // tricky part. We have to check the ARRAY version for (Class eventSuperType : types[0]) { - if (!eventSuperType.isArray()) { - // messy, but the ONLY way to do it. - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - } + // messy, but the ONLY way to do it. + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.isVarArg() && sub.handlesMessageType(firstType)) { - subscriptions.add(sub); - } + subscriptions.add(sub); } } } diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java index e57d35a..40aae59 100644 --- a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -18,7 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { @Test public void testSingleThreadedSyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = new MBassador(); + IMessageBus fifoBUs = new MBassador().start(); List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ @@ -53,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { @Test public void testSingleThreadedSyncAsyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = new MBassador(1); + IMessageBus fifoBUs = new MBassador(1).start(); List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index 89f57cc..cac3e92 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -3,7 +3,6 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.annotations.Handler; -import net.engio.mbassy.bus.DeadMessage; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java index 289cb0d..4c0d921 100644 --- a/src/test/java/net/engio/mbassy/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -2,13 +2,13 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.MessageManager; import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.error.IPublicationErrorHandler; +import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.listeners.ExceptionThrowingListener; import net.engio.mbassy.listeners.IMessageListener; import net.engio.mbassy.listeners.Listeners; @@ -102,7 +102,7 @@ public class MBassadorTest extends MessageBusTest { } }; - final MBassador bus = new MBassador(); + final MBassador bus = new MBassador().start(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); diff --git a/src/test/java/net/engio/mbassy/MultiMessageTest.java b/src/test/java/net/engio/mbassy/MultiMessageTest.java index cce9c53..fdc9e8d 100644 --- a/src/test/java/net/engio/mbassy/MultiMessageTest.java +++ b/src/test/java/net/engio/mbassy/MultiMessageTest.java @@ -20,7 +20,7 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSending(){ - IMessageBus bus = new MBassador(); + IMessageBus bus = new MBassador().start(); Listener listener = new Listener(); bus.subscribe(listener); @@ -31,45 +31,47 @@ public class MultiMessageTest extends MessageBusTest { bus.publish("s", "s", "s", "s"); bus.publish(1, 2, "s"); bus.publish(1, 2, 3, 4, 5, 6); + bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); - assertEquals(count.get(), 5); + assertEquals(count.get(), 10); } + @SuppressWarnings("unused") public static class Listener { @Handler - @SuppressWarnings("unused") public void handleSync(String o1) { count.getAndIncrement(); + System.err.println("match String"); } @Handler - @SuppressWarnings("unused") public void handleSync(String o1, String o2) { count.getAndIncrement(); + System.err.println("match String, String"); } @Handler - @SuppressWarnings("unused") public void handleSync(String o1, String o2, String o3) { count.getAndIncrement(); + System.err.println("match String, String, String"); } @Handler - @SuppressWarnings("unused") public void handleSync(Integer o1, Integer o2, String o3) { count.getAndIncrement(); - } - - @Handler(vararg = true) - @SuppressWarnings("unused") - public void handleSync(String... o) { - count.getAndIncrement(); + System.err.println("match Integer, Integer, String"); + } + + @Handler + public void handleSync(String... o) { + count.getAndIncrement(); + System.err.println("match String[]"); } @Handler - @SuppressWarnings("unused") public void handleSync(Integer... o) { count.getAndIncrement(); + System.err.println("match Integer[]"); } } } diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index 6f48ffe..5558007 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -2,12 +2,12 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.error.IPublicationErrorHandler; +import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.listeners.ExceptionThrowingListener; import net.engio.mbassy.listeners.IMessageListener; import net.engio.mbassy.listeners.MessagesListener; @@ -113,7 +113,7 @@ public abstract class SyncBusTest extends MessageBusTest { @Override protected IMessageBus getSyncMessageBus() { - return new MBassador(); + return new MBassador().start(); } } diff --git a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java index 09c1264..68d11c1 100644 --- a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java +++ b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java @@ -2,7 +2,11 @@ package net.engio.mbassy.common; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Run various tests concurrently. A given instance of runnable will be used to spawn and start @@ -16,55 +20,56 @@ import java.util.concurrent.*; public class ConcurrentExecutor { - public static void runConcurrent(final Runnable unit, int numberOfConcurrentExecutions) { - Runnable[] units = new Runnable[numberOfConcurrentExecutions]; - // create the tasks and schedule for execution - for (int i = 0; i < numberOfConcurrentExecutions; i++) { - units[i] = unit; - } - runConcurrent(units); - } + public static void runConcurrent(final Runnable unit, int numberOfConcurrentExecutions) { + Runnable[] units = new Runnable[numberOfConcurrentExecutions]; + // create the tasks and schedule for execution + for (int i = 0; i < numberOfConcurrentExecutions; i++) { + units[i] = unit; + } + runConcurrent(units); + } public static void runConcurrent(int numberOfConcurrentExecutions, final Runnable... units) { Runnable[] runnables = new Runnable[numberOfConcurrentExecutions * units.length]; // create the tasks and schedule for execution for (int i = 0; i < numberOfConcurrentExecutions; i++) { - for(int k = 0; k < units.length; k++) + for(int k = 0; k < units.length; k++) { runnables[k * numberOfConcurrentExecutions +i] = units[k]; + } } runConcurrent(runnables); } public static void runConcurrent(final Runnable... units) { - ExecutorService executor = Executors.newCachedThreadPool(); - List> returnValues = new ArrayList>(); + ExecutorService executor = Executors.newCachedThreadPool(); + List> returnValues = new ArrayList>(); - // create the tasks and schedule for execution - for (final Runnable unit : units) { - Callable wrapper = new Callable() { - @Override - public Long call() throws Exception { - long start = System.currentTimeMillis(); - unit.run(); - return System.currentTimeMillis() - start; - } - }; - returnValues.add(executor.submit(wrapper)); - } + // create the tasks and schedule for execution + for (final Runnable unit : units) { + Callable wrapper = new Callable() { + @Override + public Long call() throws Exception { + long start = System.currentTimeMillis(); + unit.run(); + return System.currentTimeMillis() - start; + } + }; + returnValues.add(executor.submit(wrapper)); + } - // wait until all tasks have been executed - try { - executor.shutdown();// tells the thread pool to execute all waiting tasks - executor.awaitTermination(5, TimeUnit.MINUTES); - } catch (InterruptedException e) { - // unlikely that this will happen - e.printStackTrace(); - } + // wait until all tasks have been executed + try { + executor.shutdown();// tells the thread pool to execute all waiting tasks + executor.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + // unlikely that this will happen + e.printStackTrace(); + } - for(Future task : returnValues){ + for(Future task : returnValues){ try { task.get(); } catch (Exception e) { @@ -72,7 +77,7 @@ public class ConcurrentExecutor { } } - } + } } diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index d5334fc..35b5326 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -2,8 +2,8 @@ package net.engio.mbassy.common; import junit.framework.Assert; import net.engio.mbassy.MBassador; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.PublicationError; +import net.engio.mbassy.error.IPublicationErrorHandler; +import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.messages.MessageTypes; import org.junit.Before; @@ -44,13 +44,13 @@ public abstract class MessageBusTest extends AssertSupport { public MBassador createBus() { - MBassador bus = new MBassador(); + MBassador bus = new MBassador().start(); bus.addErrorHandler(TestFailingHandler); return bus; } public MBassador createBus(ListenerFactory listeners) { - MBassador bus = new MBassador(); + MBassador bus = new MBassador().start(); bus.addErrorHandler(TestFailingHandler); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); return bus; diff --git a/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java index 690452e..085bd15 100644 --- a/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java +++ b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java @@ -25,11 +25,11 @@ public class SubscriptionValidator extends AssertSupport{ this.subscribedListener = subscribedListener; } - public Expectation listener(Class subscriber){ + public Expectation listener(Class subscriber) { return new Expectation(subscriber); } - private SubscriptionValidator expect(Class subscriber, Class messageType){ + private SubscriptionValidator expect(Class subscriber, Class messageType) { this.validations.add(new ValidationEntry(messageType, subscriber)); this.messageTypes.add(messageType); return this; @@ -37,8 +37,8 @@ public class SubscriptionValidator extends AssertSupport{ // match subscriptions with existing validation entries // for each tuple of subscriber and message type the specified number of listeners must exist - public void validate(SubscriptionManager manager){ - for(Class messageType : this.messageTypes){ + public void validate(SubscriptionManager manager) { + for(Class messageType : this.messageTypes) { Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); Collection validationEntries = getEntries(messageType); assertEquals(subscriptions.size(), validationEntries.size()); @@ -59,7 +59,7 @@ public class SubscriptionValidator extends AssertSupport{ - private Collection getEntries(Class messageType){ + private Collection getEntries(Class messageType) { Collection matching = new LinkedList(); for (ValidationEntry validationValidationEntry : this.validations){ if (validationValidationEntry.messageType.equals(messageType)) { @@ -70,16 +70,16 @@ public class SubscriptionValidator extends AssertSupport{ } - public class Expectation{ + public class Expectation { - private Class listener; + private Class listener; - private Expectation(Class listener) { + private Expectation(Class listener) { this.listener = listener; } - public SubscriptionValidator handles(Class ...messages){ - for(Class message : messages) { + public SubscriptionValidator handles(Class ...messages){ + for(Class message : messages) { expect(this.listener, message); } return SubscriptionValidator.this; @@ -87,18 +87,12 @@ public class SubscriptionValidator extends AssertSupport{ } private class ValidationEntry { + private Class subscriber; + private Class messageType; - - private Class subscriber; - - private Class messageType; - - private ValidationEntry(Class messageType, Class subscriber) { + private ValidationEntry(Class messageType, Class subscriber) { this.messageType = messageType; this.subscriber = subscriber; } - - } - }