From fb985b6ef121d0baafc9b656c17aaddd00c4c713 Mon Sep 17 00:00:00 2001 From: nathan Date: Thu, 5 Feb 2015 03:52:31 +0100 Subject: [PATCH] Using the disruptor to deliver messages instead of a BlockingQueue+threads --- src/main/java/net/engio/mbassy/MBassador.java | 158 +++++++++++------- .../mbassy/bus/AbstractPubSubSupport.java | 2 +- .../mbassy/disruptor/EventBusFactory.java | 16 ++ .../mbassy/disruptor/EventProcessor.java | 31 ++++ .../engio/mbassy/disruptor/MessageHolder.java | 9 + 5 files changed, 155 insertions(+), 61 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/disruptor/EventBusFactory.java create mode 100644 src/main/java/net/engio/mbassy/disruptor/EventProcessor.java create mode 100644 src/main/java/net/engio/mbassy/disruptor/MessageHolder.java diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java index ff17109..520f61d 100644 --- a/src/main/java/net/engio/mbassy/MBassador.java +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -1,82 +1,92 @@ package net.engio.mbassy; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import net.engio.mbassy.bus.AbstractPubSubSupport; import net.engio.mbassy.bus.error.PublicationError; +import net.engio.mbassy.disruptor.EventBusFactory; +import net.engio.mbassy.disruptor.EventProcessor; +import net.engio.mbassy.disruptor.MessageHolder; + +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; /** * The base class for all message bus implementations with support for asynchronous message dispatch */ public class MBassador extends AbstractPubSubSupport implements IMessageBus { - private final int numberOfMessageDispatchers; - - // all threads that are available for asynchronous message dispatching - private final List dispatchers; - - // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue pendingMessages = new LinkedBlockingQueue(Integer.MAX_VALUE / 16); - - protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() { - + /** + * The stack size is arbitrary based on JVM implementation. Default is 0 + * 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k + *

+ * To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit) + * Setting the size MAY or MAY NOT have any effect!!! + *

+ * Stack size must be specified in bytes. Default is 8k + */ + public static int stackSizeForThreads = 8192; + private static final ThreadFactory namedThreadFactory = new ThreadFactory() { private final AtomicInteger threadID = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { - Thread thread = Executors.defaultThreadFactory().newThread(r); - thread.setDaemon(true);// do not prevent the JVM from exiting - thread.setName("Dispatcher-" + this.threadID.getAndIncrement()); - return thread; + + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("MessageBus-"); + stringBuilder.append(this.threadID.getAndIncrement()); + + // stack size is arbitrary based on JVM implementation. Default is 0 + // 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k + // To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit) + // Setting the size MAY or MAY NOT have any effect!!! + Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, stringBuilder.toString(), stackSizeForThreads); + t.setDaemon(true);// do not prevent the JVM from exiting + t.setPriority(Thread.NORM_PRIORITY); + return t; } }; + // any new thread will be 'NON-DAEMON', so that it will be allowed to finish it's task before permitting the JVM to shut down + private final ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory); + + private final int ringBufferSize = 2048; + + private final Disruptor disruptor; + private final RingBuffer ringBuffer; + + public MBassador() { - this(6); + this(Runtime.getRuntime().availableProcessors() - 1); } - public MBassador(int numberOfMessageDispatchers) { + // must be power of 2. + public MBassador(int numberOfThreads) { super(); - this.numberOfMessageDispatchers = numberOfMessageDispatchers; - this.dispatchers = new ArrayList(numberOfMessageDispatchers); - initDispatcherThreads(); - } - - // initialize the dispatch workers - private void initDispatcherThreads() { - for (int i = 0; i < this.numberOfMessageDispatchers; i++) { - // each thread will run forever and process incoming - // message publication requests - Thread dispatcher = MessageDispatchThreadFactory.newThread(new Runnable() { - @Override - public void run() { - Object message = null; - while (true) { - try { - message = MBassador.this.pendingMessages.take(); - publishMessage(message); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } catch (Throwable t) { - handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message)); - } - } - } - }); - dispatcher.setName("Message dispatcher"); - this.dispatchers.add(dispatcher); - dispatcher.start(); + if (numberOfThreads < 1) { + numberOfThreads = 1; // at LEAST 1 thread. } + + EventBusFactory factory = new EventBusFactory(); + EventProcessor procs[] = new EventProcessor[numberOfThreads]; + for (int i = 0; i < procs.length; i++) { + procs[i] = new EventProcessor(this, i, procs.length); + } + + this.disruptor = new Disruptor(factory, this.ringBufferSize, this.executor, ProducerType.MULTI, new SleepingWaitStrategy()); + + // tell the disruptor to handle procs first, then results. IN ORDER. + this.disruptor.handleEventsWith(procs); + this.ringBuffer = this.disruptor.start(); } @@ -94,19 +104,48 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { @Override public void publishAsync(Object message) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + + // setup the job + final long seq = ringBuffer.next(); try { - this.pendingMessages.put(message); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.message = message; + } catch (Exception e) { + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message", message)); + } finally { + // always publish the job + ringBuffer.publish(seq); } } @Override public void publishAsync(long timeout, TimeUnit unit, Object message) { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); + + // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space + // to become available. + while (!ringBuffer.hasAvailableCapacity(1)) { + LockSupport.parkNanos(10L); + if (expireTimestamp <= System.currentTimeMillis()) { + handlePublicationError(new PublicationError(new Exception("Timeout"), "Error while adding an asynchronous message", message)); + return; + } + } + + // setup the job + final long seq = ringBuffer.next(); try { - this.pendingMessages.offer(message, timeout, unit); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.message = message; + } catch (Exception e) { + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message", message)); + } finally { + // always publish the job + ringBuffer.publish(seq); } } @@ -118,13 +157,12 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus { @Override public void shutdown() { - for (Thread dispatcher : this.dispatchers) { - dispatcher.interrupt(); - } + this.disruptor.shutdown(); + this.executor.shutdown(); } @Override public boolean hasPendingMessages() { - return this.pendingMessages.size() > 0; + return this.ringBuffer.remainingCapacity() != this.ringBufferSize; } } diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index 8fd4200..4361908 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -52,7 +52,7 @@ public abstract class AbstractPubSubSupport implements PubSubSupport, ErrorHandl } } - protected void publishMessage(Object message) { + public void publishMessage(Object message) { Class messageClass = message.getClass(); // TODO: convert this to have N number of message types diff --git a/src/main/java/net/engio/mbassy/disruptor/EventBusFactory.java b/src/main/java/net/engio/mbassy/disruptor/EventBusFactory.java new file mode 100644 index 0000000..2f75163 --- /dev/null +++ b/src/main/java/net/engio/mbassy/disruptor/EventBusFactory.java @@ -0,0 +1,16 @@ +package net.engio.mbassy.disruptor; + +import net.engio.mbassy.bus.AbstractPubSubSupport; + +import com.lmax.disruptor.EventFactory; + +public class EventBusFactory implements EventFactory { + + public EventBusFactory() { + } + + @Override + public MessageHolder newInstance() { + return new MessageHolder(); + } +} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java b/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java new file mode 100644 index 0000000..b419438 --- /dev/null +++ b/src/main/java/net/engio/mbassy/disruptor/EventProcessor.java @@ -0,0 +1,31 @@ +package net.engio.mbassy.disruptor; + +import net.engio.mbassy.bus.AbstractPubSubSupport; +import net.engio.mbassy.bus.error.PublicationError; + +import com.lmax.disruptor.EventHandler; + +public class EventProcessor implements EventHandler { + private final AbstractPubSubSupport publisher; + + private final long ordinal; + private final long numberOfConsumers; + + public EventProcessor(AbstractPubSubSupport publisher, final long ordinal, final long numberOfConsumers) { + this.publisher = publisher; + this.ordinal = ordinal; + this.numberOfConsumers = numberOfConsumers; + } + + @Override + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { + if (sequence % this.numberOfConsumers == this.ordinal) { + try { + this.publisher.publishMessage(event.message); + } catch (Throwable t) { + this.publisher.handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", event.message)); + } + event.message = null; // cleanup + } + } +} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java b/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java new file mode 100644 index 0000000..076d95f --- /dev/null +++ b/src/main/java/net/engio/mbassy/disruptor/MessageHolder.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.disruptor; + + +public class MessageHolder { + public Object message; + + public MessageHolder() { + } +} \ No newline at end of file