diff --git a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java new file mode 100644 index 0000000..79cf4a3 --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java @@ -0,0 +1,83 @@ +package net.engio.mbassy.multi; + +import java.util.Collection; +import java.util.concurrent.locks.LockSupport; + +import net.engio.mbassy.multi.common.DeadMessage; +import net.engio.mbassy.multi.common.TransferQueue; +import net.engio.mbassy.multi.error.ErrorHandlingSupport; +import net.engio.mbassy.multi.error.PublicationError; +import net.engio.mbassy.multi.subscription.Subscription; +import net.engio.mbassy.multi.subscription.SubscriptionManager; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public class DispatchRunnable implements Runnable { + + private ErrorHandlingSupport errorHandler; + private TransferQueue dispatchQueue; + private TransferQueue invokeQueue; + private SubscriptionManager manager; + + public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, + TransferQueue dispatchQueue, TransferQueue invokeQueue) { + + this.errorHandler = errorHandler; + this.manager = subscriptionManager; + this.dispatchQueue = dispatchQueue; + this.invokeQueue = invokeQueue; + } + + @Override + public void run() { + final SubscriptionManager manager = this.manager; + final ErrorHandlingSupport errorHandler = this.errorHandler; + final TransferQueue IN_queue = this.dispatchQueue; + final TransferQueue OUT_queue = this.invokeQueue; + + Object message = null; + int counter; + + while (true) { + try { + counter = MultiMBassador.WORK_RUN_BLITZ; + while ((message = IN_queue.poll()) == null) { + if (counter > MultiMBassador.WORK_RUN_BLITZ_DIV2) { + --counter; + Thread.yield(); + } else if (counter > 0) { + --counter; + LockSupport.parkNanos(1L); + } else { + message = IN_queue.take(); + break; + } + } + + Class messageClass = message.getClass(); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + + boolean empty = subscriptions.isEmpty(); + if (empty) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message); + message = deadMessage; + empty = subscriptions.isEmpty(); + } + + if (!empty) { + Runnable e = new InvokeRunnable(errorHandler, subscriptions, message); + OUT_queue.transfer(e); + } + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java new file mode 100644 index 0000000..ff1b85c --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java @@ -0,0 +1,54 @@ +package net.engio.mbassy.multi; + +import java.lang.reflect.Array; +import java.util.Collection; + +import net.engio.mbassy.multi.error.ErrorHandlingSupport; +import net.engio.mbassy.multi.subscription.Subscription; + + +/** + * @author dorkbox, llc Date: 2/2/15 + */ +public class InvokeRunnable implements Runnable { + + private ErrorHandlingSupport errorHandler; + private Collection subscriptions; + private Object message; + + public InvokeRunnable(ErrorHandlingSupport errorHandler, Collection subscriptions, Object message) { + this.errorHandler = errorHandler; + this.subscriptions = subscriptions; + this.message = message; + } + + @Override + public void run() { + ErrorHandlingSupport errorHandler = this.errorHandler; + Collection subs = this.subscriptions; + Object message = this.message; + Object[] vararg = null; + + for (Subscription sub : subs) { + boolean handled = false; + if (sub.isVarArg()) { + // messageClass will NEVER be an array to begin with, since that will call the multi-arg method + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + } + handled = true; + sub.publishToSubscription(errorHandler, vararg); + } + + if (!handled) { + sub.publishToSubscription(errorHandler, message); + } + } + } +} diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index ca3af16..84d9e38 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,20 +1,17 @@ package net.engio.mbassy.multi; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; +import net.engio.mbassy.multi.common.Pow2; import net.engio.mbassy.multi.common.TransferQueue; -import net.engio.mbassy.multi.disruptor.DeadMessage; -import net.engio.mbassy.multi.disruptor.MessageHolder; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; -import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.SubscriptionManager; -import dorkbox.util.objectPool.PoolableObject; /** * The base class for all message bus implementations with support for asynchronous message dispatch @@ -25,60 +22,28 @@ import dorkbox.util.objectPool.PoolableObject; */ public class MultiMBassador implements IMessageBus { -// private static final DisruptorThreadFactory THREAD_FACTORY = new DisruptorThreadFactory("MPMC"); - // error handling is first-class functionality // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new ArrayList(); private final SubscriptionManager subscriptionManager; - - // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down -// private final ExecutorService dispatch_Executor; -// private final ExecutorService invoke_Executor; - - // private final Queue dispatchQueue; // private final BlockingQueue dispatchQueue; private final TransferQueue dispatchQueue; private final TransferQueue invokeQueue; -// private final SynchronousQueue dispatchQueue; -// private Queue> mpmcArrayQueue; // all threads that are available for asynchronous message dispatching private List threads; -// private dorkbox.util.objectPool.ObjectPool pool; - - - // must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads. -// private final int dispatch_RingBufferSize = 4; -// private final int invoke_RingBufferSize = 2048; - -// private final Disruptor dispatch_Disruptor; -// private final RingBuffer dispatch_RingBuffer; - -// private final Disruptor invoke_Disruptor; -// private final RingBuffer invoke_RingBuffer; -// private final WorkerPool invoke_WorkerPool; - - - - public static class HolderPoolable implements PoolableObject { - @Override - public MessageHolder create() { - return new MessageHolder(); - } - } - public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()*2); + this(Runtime.getRuntime().availableProcessors()); } -// private long counter = 0L; + public static final int WORK_RUN_BLITZ = 50; + public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2; public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { @@ -96,118 +61,18 @@ public class MultiMBassador implements IMessageBus { // this.dispatchQueue = new SynchronousQueue(); // this.dispatchQueue = new LinkedBlockingQueue(Pow2.roundToPowerOfTwo(numberOfThreads)); - -// this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES, -// new SynchronousQueue(), -// new DisruptorThreadFactory("MB_Dispatch")); - -// this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, -// new SynchronousQueue(), -// new DisruptorThreadFactory("MB_Invoke")); - - - this.subscriptionManager = new SubscriptionManager(); -// this.mpmcArrayQueue = new MpmcArrayQueue>(numberOfThreads*2); -// this.pool = ObjectPoolFactory.create(new HolderPoolable(), numberOfThreads*2); - int dispatchSize = 2; -// int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads); - int invokeSize = numberOfThreads*2-dispatchSize; + int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); this.threads = new ArrayList(dispatchSize + invokeSize); DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); for (int i = 0; i < dispatchSize; i++) { // each thread will run forever and process incoming message publication requests - Runnable runnable = new Runnable() { - @Override - public void run() { - final MultiMBassador mbassador = MultiMBassador.this; - SubscriptionManager manager = mbassador.subscriptionManager; - final TransferQueue IN_queue = mbassador.dispatchQueue; - final TransferQueue OUT_queue = mbassador.invokeQueue; - - Object message = null; - while (true) { - try { - message = IN_queue.take(); - Class messageClass = message.getClass(); - - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - - boolean empty = subscriptions.isEmpty(); - if (empty) { - - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - - DeadMessage deadMessage = new DeadMessage(message); - message = deadMessage; - empty = subscriptions.isEmpty(); - } - - if (!empty) { - final Collection finalSubs = subscriptions; - final Object finalMessage = message; - Runnable e = new Runnable() { - @Override - public void run() { -// MultiMBassador mbassador = MultiMBassador.this; - Collection subs = finalSubs; - for (Subscription sub : subs) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; -// -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// handled = true; -// sub.publishToSubscription(mbassador, vararg); -// } -// -// if (!handled) { - sub.publishToSubscription(mbassador, finalMessage); -// } - } - } - }; - - -// counter = 5; -// while (!OUT_queue.offer(e)) { -// if (counter > 3) { -// --counter; -// Thread.yield(); -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -//// LockSupport.parkNanos(1L); -// } else { - OUT_queue.transfer(e); -// break; -// } -// } - -// OUT_queue.transfer(e); -// OUT_queue.put(e); - } - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); - } - } - } - }; + Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue, this.invokeQueue); Thread runner = dispatchThreadFactory.newThread(runnable); this.threads.add(runner); @@ -219,17 +84,36 @@ public class MultiMBassador implements IMessageBus { for (int i = 0; i < invokeSize; i++) { // each thread will run forever and process incoming message publication requests Runnable runnable = new Runnable() { + @SuppressWarnings("null") @Override public void run() { final MultiMBassador mbassador = MultiMBassador.this; final TransferQueue IN_queue = mbassador.invokeQueue; try { + Runnable runnable = null; + int counter; + while (true) { - IN_queue.take().run(); + runnable = null; + counter = WORK_RUN_BLITZ; + + while ((runnable = IN_queue.poll()) == null) { + if (counter > WORK_RUN_BLITZ_DIV2) { + --counter; + Thread.yield(); + } else if (counter > 0) { + --counter; + LockSupport.parkNanos(1L); + } else { + runnable = IN_queue.take(); + break; + } + } + + runnable.run(); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); return; } } @@ -239,142 +123,8 @@ public class MultiMBassador implements IMessageBus { this.threads.add(runner); runner.start(); } - - - - - - - - - - - - - - - - -//////////////////////////// -// PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); -// -// this.dispatch_Disruptor = new Disruptor(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, -// ProducerType.MULTI, new SleepingWaitStrategy()); -// this.invoke_Disruptor = new Disruptor(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor, -// ProducerType.MULTI, new SleepingWaitStrategy()); -// -// -// this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); -// this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer(); -// -// // not too many handlers, so we don't contend the locks in the subscription manager -// EventHandler dispatchHandlers[] = new DispatchProcessor[4]; -// for (int i = 0; i < dispatchHandlers.length; i++) { -// dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue); -// } - -// WorkHandler invokeHandlers[] = new InvokeProcessor[numberOfThreads]; -// for (int i = 0; i < invokeHandlers.length; i++) { -// invokeHandlers[i] = new InvokeProcessor(this); -// } -// -// this.dispatch_Disruptor.handleEventsWith(dispatchHandlers); -// this.invoke_WorkerPool = new WorkerPool(this.invoke_RingBuffer, -// this.invoke_RingBuffer.newBarrier(), -// loggingExceptionHandler, -// invokeHandlers); -// -// this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences()); -///////////////////////////////// - -// -// this.runners = new ArrayList(numberOfThreads); -// for (int i = 0; i < numberOfThreads; i++) { -// // each thread will run forever and process incoming message publication requests -// InterruptRunnable runnable = new InterruptRunnable() { -// @Override -// public void run() { -// final int DEFAULT_RETRIES = 200; -// final MultiMBassador mbassador = MultiMBassador.this; -// final Queue> queue = mbassador.mpmcArrayQueue; -// final ObjectPool pool2 = mbassador.pool; -// -// int counter = DEFAULT_RETRIES; -// ObjectPoolHolder holder = null; -// MessageHolder value; -// -// while (this.running) { -// holder = queue.poll(); -// if (holder != null) { -// // sends off to an executor -// value = holder.getValue(); -// Collection subscriptions = value.subscriptionsHolder; -// Object message = value.message1; -// Class messageClass = message.getClass(); -// -// if (messageClass.equals(DeadMessage.class)) { -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(mbassador, message); -// } -// } else { -// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; -// -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(mbassador, vararg); -// } -// -// if (!handled) { -// sub.publishToSubscription(mbassador, message); -// } -// } -// } -// -// pool2.release(holder); -// counter = DEFAULT_RETRIES; -// } else { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); -// counter = DEFAULT_RETRIES; -// } -// } -// -//// handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",holder)); -// } -// } -// }; -// Thread runner = THREAD_FACTORY.newThread(runnable); -// this.runners.add(runnable); -// runner.start(); -// } } - public final MultiMBassador start() { -// this.invoke_WorkerPool.start(this.invoke_Executor); -// this.invoke_Disruptor.start(); -// this.dispatch_Disruptor.start(); - return this; - } - - @Override public final void addErrorHandler(IPublicationErrorHandler handler) { synchronized (this.errorHandlers) { diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java b/src/main/java/net/engio/mbassy/multi/common/DeadMessage.java similarity index 96% rename from src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java rename to src/main/java/net/engio/mbassy/multi/common/DeadMessage.java index 49a991c..9a7249e 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java +++ b/src/main/java/net/engio/mbassy/multi/common/DeadMessage.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.multi.disruptor; +package net.engio.mbassy.multi.common; /** * The dead message event is published whenever no message diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchFactory.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchFactory.java deleted file mode 100644 index 18fe9fe..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import com.lmax.disruptor.EventFactory; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class DispatchFactory implements EventFactory { - - public DispatchFactory() { - } - - @Override - public DispatchHolder newInstance() { - return new DispatchHolder(); - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java deleted file mode 100644 index 6f9c82a..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java +++ /dev/null @@ -1,18 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class DispatchHolder { - public MessageType messageType = MessageType.ONE; - - public Object message1 = null; - public Object message2 = null; - public Object message3 = null; - public Object[] messages = null; - - public DispatchHolder() { - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java deleted file mode 100644 index 85e0442..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java +++ /dev/null @@ -1,137 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import java.util.Collection; -import java.util.Queue; - -import net.engio.mbassy.multi.MultiMBassador; -import net.engio.mbassy.multi.error.PublicationError; -import net.engio.mbassy.multi.subscription.Subscription; -import net.engio.mbassy.multi.subscription.SubscriptionManager; - -import com.lmax.disruptor.EventHandler; - -/** - * @author dorkbox, llc Date: 2/2/15 - */ -public class DispatchProcessor implements EventHandler { - private final MultiMBassador publisher; - private final long ordinal; - private final long numberOfConsumers; - - private final SubscriptionManager subscriptionManager; -// private final RingBuffer invoke_RingBuffer; - private final Queue queue; - - public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers, - final SubscriptionManager subscriptionManager, Queue queue) { - this.publisher = publisher; - - this.ordinal = ordinal; - this.numberOfConsumers = numberOfConsumers; - this.subscriptionManager = subscriptionManager; - this.queue = queue; - } - - - @Override - public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception { - if (sequence % this.numberOfConsumers == this.ordinal) { - // Process the event - // switch (event.messageType) { - // case ONE: { - publish(event.message1); - event.message1 = null; // cleanup - // return; - // } - // case TWO: { - // // publisher.publish(this.message1, this.message2); - // event.message1 = null; // cleanup - // event.message2 = null; // cleanup - // return; - // } - // case THREE: { - // // publisher.publish(this.message1, this.message2, this.message3); - // event.message1 = null; // cleanup - // event.message2 = null; // cleanup - // event.message3 = null; // cleanup - // return; - // } - // case ARRAY: { - // // publisher.publish(this.messages); - // event.messages = null; // cleanup - // return; - // } - // } - - } - } - - private void publish(Object message) { - Class messageClass = message.getClass(); - - SubscriptionManager manager = this.subscriptionManager; - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - - try { - boolean empty = subscriptions.isEmpty(); - if (empty) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - - message = new DeadMessage(message); - - empty = subscriptions.isEmpty(); - } - - if (!empty) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.invoke_RingBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// eventJob.subscriptions = subscriptions; -// } catch (Throwable e) { -// this.publisher.handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - - - -// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are -// ObjectPoolHolder messageHolder = this.pool.take(); -// MessageHolder value = messageHolder.getValue(); - MessageHolder messageHolder = new MessageHolder(); - messageHolder.subscriptions= subscriptions; - messageHolder.messageType = MessageType.ONE; - messageHolder.message1 = message; - -// this.queue.put(messageHolder); - -// int counter = 200; -// while (!this.queue.offer(messageHolder)) { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); -// } -// } - } - } catch (Throwable e) { - this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) - .setPublishedObject(message)); - } - } - -} diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/InvokeFactory.java b/src/main/java/net/engio/mbassy/multi/disruptor/InvokeFactory.java deleted file mode 100644 index d93d6b5..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/InvokeFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import com.lmax.disruptor.EventFactory; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class InvokeFactory implements EventFactory { - - public InvokeFactory() { - } - - @Override - public MessageHolder newInstance() { - return new MessageHolder(); - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/InvokeProcessor.java b/src/main/java/net/engio/mbassy/multi/disruptor/InvokeProcessor.java deleted file mode 100644 index e24c539..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/InvokeProcessor.java +++ /dev/null @@ -1,85 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import java.util.Collection; - -import net.engio.mbassy.multi.MultiMBassador; -import net.engio.mbassy.multi.subscription.Subscription; - -import com.lmax.disruptor.WorkHandler; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class InvokeProcessor implements WorkHandler { - private final MultiMBassador publisher; - - public InvokeProcessor(final MultiMBassador publisher) { - this.publisher = publisher; - } - - @Override - public void onEvent(MessageHolder event) throws Exception { -// switch (event.messageType) { -// case ONE: { - publish(event.subscriptions, event.message1); - event.message1 = null; // cleanup - event.subscriptions = null; -// return; -// } -// case TWO: { -//// publisher.publish(this.message1, this.message2); -// event.message1 = null; // cleanup -// event.message2 = null; // cleanup -// return; -// } -// case THREE: { -//// publisher.publish(this.message1, this.message2, this.message3); -// event.message1 = null; // cleanup -// event.message2 = null; // cleanup -// event.message3 = null; // cleanup -// return; -// } -// case ARRAY: { -//// publisher.publish(this.messages); -// event.messages = null; // cleanup -// return; -// } -// } - } - - private void publish(Collection subscriptions, Object message) { - Class messageClass = message.getClass(); - - if (messageClass.equals(DeadMessage.class)) { - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this.publisher, message); - } - } else { - Object[] vararg = null; - - for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; -// -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(this.publisher, vararg); -// } - -// if (!handled) { - sub.publishToSubscription(this.publisher, message); -// } - } - } - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/MessageHolder.java b/src/main/java/net/engio/mbassy/multi/disruptor/MessageHolder.java deleted file mode 100644 index 48b41a6..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/MessageHolder.java +++ /dev/null @@ -1,53 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import java.util.Collection; - -import net.engio.mbassy.multi.MultiMBassador; -import net.engio.mbassy.multi.subscription.Subscription; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class MessageHolder { - public MessageType messageType = MessageType.ONE; - - public Object message1 = null; - public Object message2 = null; - public Object message3 = null; - public Object[] messages = null; - - public Collection subscriptions; - - public MessageHolder() { - } - - public void publish(MultiMBassador publisher) { -// -// switch (this.messageType) { -// case ONE: { -// publisher.publishExecutor(this.message1); -// this.message1 = null; // cleanup -// return; -// } -// case TWO: { -// publisher.publish(this.message1, this.message2); -// this.message1 = null; // cleanup -// this.message2 = null; // cleanup -// return; -// } -// case THREE: { -// publisher.publish(this.message1, this.message2, this.message3); -// this.message1 = null; // cleanup -// this.message2 = null; // cleanup -// this.message3 = null; // cleanup -// return; -// } -// case ARRAY: { -// publisher.publish(this.messages); -// this.messages = null; // cleanup -// return; -// } -// } - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/MessageType.java b/src/main/java/net/engio/mbassy/multi/disruptor/MessageType.java deleted file mode 100644 index d81d8f1..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/MessageType.java +++ /dev/null @@ -1,9 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public enum MessageType { - ONE, TWO, THREE, ARRAY -} diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/PublicationExceptionHandler.java b/src/main/java/net/engio/mbassy/multi/disruptor/PublicationExceptionHandler.java deleted file mode 100644 index 6a33a9e..0000000 --- a/src/main/java/net/engio/mbassy/multi/disruptor/PublicationExceptionHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -package net.engio.mbassy.multi.disruptor; - -import net.engio.mbassy.multi.error.ErrorHandlingSupport; -import net.engio.mbassy.multi.error.PublicationError; - -import com.lmax.disruptor.ExceptionHandler; - -public final class PublicationExceptionHandler implements ExceptionHandler { - private final ErrorHandlingSupport errorHandler; - - public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) { - this.errorHandler = errorHandler; - } - - @Override - public void handleEventException(final Throwable e, final long sequence, final Object event) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")") - .setCause(e)); - } - - @Override - public void handleOnStartException(final Throwable e) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error starting the disruptor") - .setCause(e)); - } - - @Override - public void handleOnShutdownException(final Throwable e) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error stopping the disruptor") - .setCause(e)); - } -} diff --git a/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java index 73af4be..aac72b5 100644 --- a/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/multi/AsyncFIFOBusTest.java @@ -3,8 +3,6 @@ package net.engio.mbassy.multi; import java.util.LinkedList; import java.util.List; -import net.engio.mbassy.multi.IMessageBus; -import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.common.MessageBusTest; @@ -20,7 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { @Test public void testSingleThreadedSyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = new MultiMBassador().start(); + IMessageBus fifoBUs = new MultiMBassador(); List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ @@ -55,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { @Test public void testSingleThreadedSyncAsyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = new MultiMBassador(1).start(); + IMessageBus fifoBUs = new MultiMBassador(1); List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ diff --git a/src/test/java/net/engio/mbassy/multi/DeadMessageTest.java b/src/test/java/net/engio/mbassy/multi/DeadMessageTest.java index 703713c..b0e75eb 100644 --- a/src/test/java/net/engio/mbassy/multi/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/multi/DeadMessageTest.java @@ -5,10 +5,10 @@ import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.annotations.Handler; import net.engio.mbassy.multi.common.ConcurrentExecutor; +import net.engio.mbassy.multi.common.DeadMessage; import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.MessageBusTest; import net.engio.mbassy.multi.common.TestUtil; -import net.engio.mbassy.multi.disruptor.DeadMessage; import net.engio.mbassy.multi.listeners.IMessageListener; import net.engio.mbassy.multi.listeners.MessagesListener; import net.engio.mbassy.multi.listeners.ObjectListener; diff --git a/src/test/java/net/engio/mbassy/multi/MBassadorTest.java b/src/test/java/net/engio/mbassy/multi/MBassadorTest.java index 3f52882..5b8d44b 100644 --- a/src/test/java/net/engio/mbassy/multi/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/multi/MBassadorTest.java @@ -2,7 +2,6 @@ package net.engio.mbassy.multi; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.MessageBusTest; @@ -103,7 +102,7 @@ public class MBassadorTest extends MessageBusTest { } }; - final MultiMBassador bus = new MultiMBassador().start(); + final MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); diff --git a/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java b/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java index c6727dc..ff2ba80 100644 --- a/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java +++ b/src/test/java/net/engio/mbassy/multi/MultiMessageTest.java @@ -20,7 +20,7 @@ public class MultiMessageTest extends MessageBusTest { @Test public void testMultiMessageSending(){ - IMessageBus bus = new MultiMBassador().start(); + IMessageBus bus = new MultiMBassador(); Listener listener1 = new Listener(); bus.subscribe(listener1); diff --git a/src/test/java/net/engio/mbassy/multi/SyncBusTest.java b/src/test/java/net/engio/mbassy/multi/SyncBusTest.java index 22269a1..ebc7732 100644 --- a/src/test/java/net/engio/mbassy/multi/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/multi/SyncBusTest.java @@ -2,8 +2,6 @@ package net.engio.mbassy.multi; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.multi.IMessageBus; -import net.engio.mbassy.multi.MultiMBassador; import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ListenerFactory; import net.engio.mbassy.multi.common.MessageBusTest; @@ -31,7 +29,7 @@ public class SyncBusTest extends MessageBusTest { @Test public void testSynchronousMessagePublication() throws Exception { - final IMessageBus bus = new MultiMBassador().start(); + final IMessageBus bus = new MultiMBassador(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class) @@ -84,7 +82,7 @@ public class SyncBusTest extends MessageBusTest { } }; - final IMessageBus bus = new MultiMBassador().start(); + final IMessageBus bus = new MultiMBassador(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); diff --git a/src/test/java/net/engio/mbassy/multi/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/multi/common/MessageBusTest.java index 5e8d22c..997539c 100644 --- a/src/test/java/net/engio/mbassy/multi/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/multi/common/MessageBusTest.java @@ -42,13 +42,13 @@ public abstract class MessageBusTest extends AssertSupport { public MultiMBassador createBus() { - MultiMBassador bus = new MultiMBassador().start(); + MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(TestFailingHandler); return bus; } public MultiMBassador createBus(ListenerFactory listeners) { - MultiMBassador bus = new MultiMBassador().start(); + MultiMBassador bus = new MultiMBassador(); bus.addErrorHandler(TestFailingHandler); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); return bus;