From 4d5b53fa36454632c8b45c20c438024c8378183c Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 6 Feb 2016 23:40:58 +0100 Subject: [PATCH] Cleaned up sync/async publication --- src/dorkbox/util/messagebus/MessageBus.java | 6 +- .../messagebus/subscription/Subscription.java | 15 -- .../util/messagebus/synchrony/AsyncABQ.java | 171 ++++++++-------- .../messagebus/synchrony/AsyncABQ_noGc.java | 183 +++++++++--------- .../messagebus/synchrony/AsyncDisruptor.java | 26 +-- .../synchrony/disruptor/MessageHandler.java | 75 ++++--- 6 files changed, 230 insertions(+), 246 deletions(-) diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 985360f..946ce80 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -135,15 +135,15 @@ class MessageBus implements IMessageBus { // the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue if (useDisruptorForAsyncPublish) { - asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, publisher, syncPublication); + asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication); } else { if (useNoGarbageVersionOfABQ) { // no garbage is created, but this is slow (but faster than other messagebus implementations) - asyncPublication = new AsyncABQ_noGc(numberOfThreads, errorHandler, publisher, syncPublication); + asyncPublication = new AsyncABQ_noGc(numberOfThreads, errorHandler, syncPublication); } else { // garbage is created, but this is fast - asyncPublication = new AsyncABQ(numberOfThreads, errorHandler, publisher, syncPublication); + asyncPublication = new AsyncABQ(numberOfThreads, errorHandler, syncPublication); } } } diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index 7d81d52..3e8970b 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -67,12 +67,6 @@ class Subscription { this.handler = handler; entries = new IdentityMap(32, SubscriptionManager.LOAD_FACTOR); - - - if (handler.acceptsSubtypes()) { - // TODO keep a list of "super-class" messages that access this. This is updated by multiple threads. This is so we know WHAT - // super-subscriptions to clear when we sub/unsub - } } // called on shutdown for GC purposes @@ -143,21 +137,12 @@ class Subscription { return this.entries.size; } - /** - * @return true if messages were published - */ public abstract void publish(final Object message) throws Throwable; - /** - * @return true if messages were published - */ public abstract void publish(final Object message1, final Object message2) throws Throwable; - /** - * @return true if messages were published - */ public abstract void publish(final Object message1, final Object message2, final Object message3) throws Throwable; diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java index 5f6c46a..2e75d6b 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java @@ -18,7 +18,6 @@ package dorkbox.util.messagebus.synchrony; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.publication.Publisher; import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.disruptor.MessageType; @@ -47,110 +46,100 @@ class AsyncABQ implements Synchrony { public - AsyncABQ(final int numberOfThreads, - final ErrorHandlingSupport errorHandler, - final Publisher publisher, - final Synchrony syncPublication) { + AsyncABQ(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) { this.dispatchQueue = new ArrayBlockingQueue(1024); + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @SuppressWarnings("ConstantConditions") + @Override + public + void run() { + final ArrayBlockingQueue IN_QUEUE = AsyncABQ.this.dispatchQueue; + final Synchrony syncPublication1 = syncPublication; + final ErrorHandlingSupport errorHandler1 = errorHandler; + + while (!AsyncABQ.this.shuttingDown) { + process(IN_QUEUE, syncPublication1, errorHandler1); + } + } + }; + this.threads = new ArrayDeque(numberOfThreads); final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); for (int i = 0; i < numberOfThreads; i++) { - - // each thread will run forever and process incoming message publication requests - Runnable runnable = new Runnable() { - @Override - public - void run() { - final ArrayBlockingQueue IN_QUEUE = AsyncABQ.this.dispatchQueue; - final Publisher publisher1 = publisher; - final Synchrony syncPublication1 = syncPublication; - final ErrorHandlingSupport errorHandler1 = errorHandler; - - MessageHolder event = null; - int messageType = MessageType.ONE; - Object message1 = null; - Object message2 = null; - Object message3 = null; - - while (!AsyncABQ.this.shuttingDown) { - try { - event = IN_QUEUE.take(); - messageType = event.type; - message1 = event.message1; - message2 = event.message2; - message3 = event.message3; - - switch (messageType) { - case MessageType.ONE: { - publisher1.publish(syncPublication1, message1); - break; - } - case MessageType.TWO: { - publisher1.publish(syncPublication1, message1, message2); - break; - } - case MessageType.THREE: { - publisher1.publish(syncPublication1, message3, message1, message2); - break; - } - } - } catch (InterruptedException e) { - if (!AsyncABQ.this.shuttingDown) { - switch (messageType) { - case MessageType.ONE: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - case MessageType.TWO: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1, message2); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - case MessageType.THREE: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1, message2, message3); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - } - } - } - } - } - }; - Thread runner = threadFactory.newThread(runnable); this.threads.add(runner); } } + @SuppressWarnings("Duplicates") + private + void process(final ArrayBlockingQueue queue, final Synchrony sync, final ErrorHandlingSupport errorHandler) { + MessageHolder event = null; + int messageType = MessageType.ONE; + Subscription[] subscriptions; + Object message1 = null; + Object message2 = null; + Object message3 = null; + + try { + event = queue.take(); + messageType = event.type; + subscriptions = event.subscriptions; + message1 = event.message1; + message2 = event.message2; + message3 = event.message3; + + switch (messageType) { + case MessageType.ONE: { + sync.publish(subscriptions, message1); + return; + } + case MessageType.TWO: { + sync.publish(subscriptions, message1, message2); + return; + } + case MessageType.THREE: { + sync.publish(subscriptions, message1, message2, message3); + //noinspection UnnecessaryReturnStatement + return; + } + } + } catch (Throwable e) { + if (event != null) { + switch (messageType) { + case MessageType.ONE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1)); + return; + } + case MessageType.TWO: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1, message2)); + return; + } + case MessageType.THREE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + //noinspection UnnecessaryReturnStatement + return; + } + } + } + } + } + + @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { MessageHolder take = new MessageHolder(); + take.type = MessageType.ONE; take.subscriptions = subscriptions; take.message1 = message1; @@ -162,6 +151,7 @@ class AsyncABQ implements Synchrony { public void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { MessageHolder take = new MessageHolder(); + take.type = MessageType.TWO; take.subscriptions = subscriptions; take.message1 = message1; @@ -174,6 +164,7 @@ class AsyncABQ implements Synchrony { public void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { MessageHolder take = new MessageHolder(); + take.type = MessageType.THREE; take.subscriptions = subscriptions; take.message1 = message1; diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java index 1a43930..404bfd4 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java @@ -18,7 +18,6 @@ package dorkbox.util.messagebus.synchrony; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.publication.Publisher; import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.disruptor.MessageType; @@ -42,7 +41,6 @@ class AsyncABQ_noGc implements Synchrony { // have two queues to prevent garbage, So we pull off one queue to add to another queue and when done, we put it back private final ArrayBlockingQueue gcQueue; - private final Collection threads; /** @@ -52,10 +50,7 @@ class AsyncABQ_noGc implements Synchrony { public - AsyncABQ_noGc(final int numberOfThreads, - final ErrorHandlingSupport errorHandler, - final Publisher publisher, - final Synchrony syncPublication) { + AsyncABQ_noGc(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) { this.dispatchQueue = new ArrayBlockingQueue(1024); this.gcQueue = new ArrayBlockingQueue(1024); @@ -65,107 +60,103 @@ class AsyncABQ_noGc implements Synchrony { gcQueue.add(new MessageHolder()); } + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @SuppressWarnings("ConstantConditions") + @Override + public + void run() { + final ArrayBlockingQueue IN_QUEUE = AsyncABQ_noGc.this.dispatchQueue; + final ArrayBlockingQueue OUT_QUEUE = AsyncABQ_noGc.this.gcQueue; + + final Synchrony syncPublication1 = syncPublication; + final ErrorHandlingSupport errorHandler1 = errorHandler; + + while (!AsyncABQ_noGc.this.shuttingDown) { + process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1); + } + } + }; + this.threads = new ArrayDeque(numberOfThreads); final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); for (int i = 0; i < numberOfThreads; i++) { - - // each thread will run forever and process incoming message publication requests - Runnable runnable = new Runnable() { - @Override - public - void run() { - final ArrayBlockingQueue IN_QUEUE = AsyncABQ_noGc.this.dispatchQueue; - final ArrayBlockingQueue OUT_QUEUE = AsyncABQ_noGc.this.gcQueue; - final Publisher publisher1 = publisher; - final Synchrony syncPublication1 = syncPublication; - final ErrorHandlingSupport errorHandler1 = errorHandler; - - MessageHolder event = null; - int messageType = MessageType.ONE; - Object message1 = null; - Object message2 = null; - Object message3 = null; - - while (!AsyncABQ_noGc.this.shuttingDown) { - try { - event = IN_QUEUE.take(); - messageType = event.type; - message1 = event.message1; - message2 = event.message2; - message3 = event.message3; - - OUT_QUEUE.put(event); - - - switch (messageType) { - case MessageType.ONE: { - publisher1.publish(syncPublication1, message1); - break; - } - case MessageType.TWO: { - publisher1.publish(syncPublication1, message1, message2); - break; - } - case MessageType.THREE: { - publisher1.publish(syncPublication1, message3, message1, message2); - break; - } - } - } catch (InterruptedException e) { - if (!AsyncABQ_noGc.this.shuttingDown) { - switch (messageType) { - case MessageType.ONE: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - case MessageType.TWO: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1, message2); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - case MessageType.THREE: { - PublicationError publicationError = new PublicationError() - .setMessage("Thread interrupted while processing message") - .setCause(e); - - if (event != null) { - publicationError.setPublishedObject(message1, message2, message3); - } - - errorHandler1.handlePublicationError(publicationError); - break; - } - } - } - } - } - } - }; - Thread runner = threadFactory.newThread(runnable); this.threads.add(runner); } } + @SuppressWarnings("Duplicates") + private + void process(final ArrayBlockingQueue queue, + final ArrayBlockingQueue gcQueue, + final Synchrony sync, + final ErrorHandlingSupport errorHandler) { + + MessageHolder event = null; + int messageType = MessageType.ONE; + Subscription[] subscriptions; + Object message1 = null; + Object message2 = null; + Object message3 = null; + + try { + event = queue.take(); + messageType = event.type; + subscriptions = event.subscriptions; + message1 = event.message1; + message2 = event.message2; + message3 = event.message3; + + gcQueue.put(event); + + switch (messageType) { + case MessageType.ONE: { + sync.publish(subscriptions, message1); + return; + } + case MessageType.TWO: { + sync.publish(subscriptions, message1, message2); + return; + } + case MessageType.THREE: { + sync.publish(subscriptions, message1, message2, message3); + //noinspection UnnecessaryReturnStatement + return; + } + } + } catch (Throwable e) { + if (event != null) { + switch (messageType) { + case MessageType.ONE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1)); + return; + } + case MessageType.TWO: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1, message2)); + return; + } + case MessageType.THREE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + //noinspection UnnecessaryReturnStatement + return; + } + } + } + } + } + @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { MessageHolder take = gcQueue.take(); + take.type = MessageType.ONE; take.subscriptions = subscriptions; take.message1 = message1; @@ -177,6 +168,7 @@ class AsyncABQ_noGc implements Synchrony { public void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { MessageHolder take = gcQueue.take(); + take.type = MessageType.TWO; take.subscriptions = subscriptions; take.message1 = message1; @@ -189,6 +181,7 @@ class AsyncABQ_noGc implements Synchrony { public void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { MessageHolder take = gcQueue.take(); + take.type = MessageType.THREE; take.subscriptions = subscriptions; take.message1 = message1; diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java index 869c41a..1099952 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java @@ -15,22 +15,14 @@ */ package dorkbox.util.messagebus.synchrony; -import com.lmax.disruptor.LiteBlockingWaitStrategy; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.Sequencer; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.WorkProcessor; +import com.lmax.disruptor.*; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.disruptor.EventBusFactory; import dorkbox.util.messagebus.synchrony.disruptor.MessageHandler; import dorkbox.util.messagebus.synchrony.disruptor.MessageType; import dorkbox.util.messagebus.synchrony.disruptor.PublicationExceptionHandler; -import dorkbox.util.messagebus.subscription.Subscription; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -43,15 +35,13 @@ import java.util.concurrent.locks.LockSupport; public final class AsyncDisruptor implements Synchrony { - private final ErrorHandlingSupport errorHandler; private WorkProcessor[] workProcessors; private MessageHandler[] handlers; private RingBuffer ringBuffer; private Sequence workSequence; public - AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Publisher publisher, final Synchrony syncPublication) { - this.errorHandler = errorHandler; + AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) { // Now we setup the disruptor and work handlers ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, @@ -65,7 +55,7 @@ class AsyncDisruptor implements Synchrony { // setup the work handlers handlers = new MessageHandler[numberOfThreads]; for (int i = 0; i < handlers.length; i++) { - handlers[i] = new MessageHandler(publisher, syncPublication); // exactly one per thread is used + handlers[i] = new MessageHandler(syncPublication, errorHandler); // exactly one per thread is used } @@ -79,14 +69,14 @@ class AsyncDisruptor implements Synchrony { WaitStrategy consumerWaitStrategy; -// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one +// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good blocking one // consumerWaitStrategy = new BlockingWaitStrategy(); // consumerWaitStrategy = new YieldingWaitStrategy(); -// consumerWaitStrategy = new BusySpinWaitStrategy(); +// consumerWaitStrategy = new BusySpinWaitStrategy(); // best for low latency // consumerWaitStrategy = new SleepingWaitStrategy(); // consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); -// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); - consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); + consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); // good combo ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); diff --git a/src/dorkbox/util/messagebus/synchrony/disruptor/MessageHandler.java b/src/dorkbox/util/messagebus/synchrony/disruptor/MessageHandler.java index f9f951d..c08003f 100644 --- a/src/dorkbox/util/messagebus/synchrony/disruptor/MessageHandler.java +++ b/src/dorkbox/util/messagebus/synchrony/disruptor/MessageHandler.java @@ -17,51 +17,77 @@ package dorkbox.util.messagebus.synchrony.disruptor; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.WorkHandler; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.error.PublicationError; +import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.MessageHolder; import dorkbox.util.messagebus.synchrony.Synchrony; -import dorkbox.util.messagebus.publication.Publisher; import java.util.concurrent.atomic.AtomicBoolean; /** - * @author dorkbox, llc - * Date: 2/2/15 + * @author dorkbox, llc Date: 2/2/15 */ public class MessageHandler implements WorkHandler, LifecycleAware { - private final Publisher publisher; + private final Synchrony syncPublication; + private final ErrorHandlingSupport errorHandler; - AtomicBoolean shutdown = new AtomicBoolean(false); - + private final AtomicBoolean shutdown = new AtomicBoolean(false); public - MessageHandler(final Publisher publisher, final Synchrony syncPublication) { - this.publisher = publisher; + MessageHandler(final Synchrony syncPublication, final ErrorHandlingSupport errorHandler) { this.syncPublication = syncPublication; + this.errorHandler = errorHandler; } + @SuppressWarnings("Duplicates") @Override public void onEvent(final MessageHolder event) throws Exception { final int messageType = event.type; - switch (messageType) { - case MessageType.ONE: { - this.publisher.publish(syncPublication, event.message1); - return; + final Subscription[] subscriptions = event.subscriptions; + + try { + switch (messageType) { + case MessageType.ONE: { + syncPublication.publish(subscriptions, event.message1); + return; + } + case MessageType.TWO: { + syncPublication.publish(subscriptions, event.message1, event.message2); + return; + } + case MessageType.THREE: { + syncPublication.publish(subscriptions, event.message1, event.message2, event.message3); + //noinspection UnnecessaryReturnStatement + return; + } } - case MessageType.TWO: { - Object message1 = event.message1; - Object message2 = event.message2; - this.publisher.publish(syncPublication, message1, message2); - return; - } - case MessageType.THREE: { - Object message1 = event.message1; - Object message2 = event.message2; - Object message3 = event.message3; - this.publisher.publish(syncPublication, message3, message1, message2); - return; + } catch (Throwable e) { + switch (messageType) { + case MessageType.ONE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(event.message1)); + return; + } + case MessageType.TWO: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(event.message1, event.message2)); + return; + } + case MessageType.THREE: { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + .setCause(e) + .setPublishedObject(event.message1, + event.message2, + event.message3)); + //noinspection UnnecessaryReturnStatement + return; + } } } } @@ -69,7 +95,6 @@ class MessageHandler implements WorkHandler, LifecycleAware { @Override public void onStart() { - } @Override