From 3c021f383caa2af9f388cce2258b8a7435c658cd Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 1 Mar 2015 02:23:41 +0100 Subject: [PATCH] WIP disruptor. WIP simple queue --- .../com/lmax/disruptor/DisruptorQueue.java | 104 +++++ .../com/lmax/disruptor/EventBusFactory.java | 18 + .../com/lmax/disruptor/EventProcessor2.java | 53 +++ .../lmax/disruptor}/MessageHolder.java | 10 +- .../lmax/disruptor}/MessageType.java | 2 +- .../PublicationExceptionHandler.java | 35 ++ .../WaitingMultiProducerSequencer.java | 371 ++++++++++++++++++ .../lmax/disruptor/WaitingWorkProcessor.java | 208 ++++++++++ .../com/lmax/disruptor/WaitingWorkerPool.java | 150 +++++++ .../disruptor/WorkHandlerEarlyRelease.java | 37 ++ .../util/messagebus/MultiMBassador.java | 46 ++- .../common/simpleq/SimpleQueue.java | 109 +++++ .../subscription/EventProcessor.java | 47 --- .../util/messagebus/PerformanceTest.java | 19 +- 14 files changed, 1131 insertions(+), 78 deletions(-) create mode 100644 src/main/java/com/lmax/disruptor/DisruptorQueue.java create mode 100644 src/main/java/com/lmax/disruptor/EventBusFactory.java create mode 100644 src/main/java/com/lmax/disruptor/EventProcessor2.java rename src/main/java/{dorkbox/util/messagebus/subscription => com/lmax/disruptor}/MessageHolder.java (64%) rename src/main/java/{dorkbox/util/messagebus/subscription => com/lmax/disruptor}/MessageType.java (70%) create mode 100644 src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java create mode 100644 src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java create mode 100644 src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java create mode 100644 src/main/java/com/lmax/disruptor/WaitingWorkerPool.java create mode 100644 src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java create mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java delete mode 100644 src/main/java/dorkbox/util/messagebus/subscription/EventProcessor.java diff --git a/src/main/java/com/lmax/disruptor/DisruptorQueue.java b/src/main/java/com/lmax/disruptor/DisruptorQueue.java new file mode 100644 index 0000000..a6bd2cc --- /dev/null +++ b/src/main/java/com/lmax/disruptor/DisruptorQueue.java @@ -0,0 +1,104 @@ +package com.lmax.disruptor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import dorkbox.util.messagebus.MultiMBassador; +import dorkbox.util.messagebus.common.NamedThreadFactory; + + + +public class DisruptorQueue { + + + private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); + + private final ExecutorService executor; + + // must be power of 2. + private final int ringBufferSize = 1; + + private final RingBuffer ringBuffer; + private WaitingWorkerPool workerPool; + + public DisruptorQueue(MultiMBassador mbassador, int numberOfThreads) { +// numberOfThreads = 4; + + // only used to startup threads, can be replaced with static list of threads + this.executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0, + TimeUnit.NANOSECONDS, + new LinkedTransferQueue(), + new NamedThreadFactory("disruptor")); + + + + + EventBusFactory factory = new EventBusFactory(); + PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(mbassador); + + WorkHandlerEarlyRelease handlers[] = new EventProcessor2[numberOfThreads]; + for (int i = 0; i < handlers.length; i++) { + handlers[i] = new EventProcessor2(mbassador); + } + + WaitStrategy consumerWaitStrategy = new BlockingWaitStrategy(); + WaitingMultiProducerSequencer sequencer = new WaitingMultiProducerSequencer(4, loggingExceptionHandler, consumerWaitStrategy); + this.ringBuffer = new RingBuffer(factory, sequencer); + + + SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier(); + this.workerPool = new WaitingWorkerPool(this.ringBuffer, + sequencer, + sequenceBarrier, + loggingExceptionHandler, + handlers); + + sequencer.addGatingSequences(this.workerPool.getWorkerSequences()); // to notify our consumers (if they are blocking) of a new element + this.workerPool.start(this.executor); + } + + public void transfer(Object message1) throws InterruptedException { + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.ringBuffer; + + // setup the job + final long seq = ringBuffer.next(); + try { +// System.err.println("+(" + seq + ") " + message1); + MessageHolder eventJob = ringBuffer.get(seq); + eventJob.messageType = MessageType.ONE; + eventJob.message1 = message1; +// eventJob.message2 = message2; +// eventJob.message3 = message3; + } catch (Exception e) { + e.printStackTrace(); +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message1)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + } + + public boolean hasPendingMessages() { + final long cursor = this.ringBuffer.getCursor(); + Sequence[] workerSequences = this.workerPool.getWorkerSequences(); + for (Sequence s : workerSequences) { + if (cursor > s.get()) + { + return true; + } + } + + return false; + } + + public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + + } +} diff --git a/src/main/java/com/lmax/disruptor/EventBusFactory.java b/src/main/java/com/lmax/disruptor/EventBusFactory.java new file mode 100644 index 0000000..c002ab7 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/EventBusFactory.java @@ -0,0 +1,18 @@ +package com.lmax.disruptor; + +import com.lmax.disruptor.EventFactory; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public class EventBusFactory implements EventFactory { + + public EventBusFactory() { + } + + @Override + public MessageHolder newInstance() { + return new MessageHolder(); + } +} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/EventProcessor2.java b/src/main/java/com/lmax/disruptor/EventProcessor2.java new file mode 100644 index 0000000..9795f16 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/EventProcessor2.java @@ -0,0 +1,53 @@ +package com.lmax.disruptor; + +import dorkbox.util.messagebus.PubSubSupport; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public class EventProcessor2 implements WorkHandlerEarlyRelease { + private final PubSubSupport publisher; + private WaitingWorkProcessor workProcessor; + + public EventProcessor2(PubSubSupport publisher) { + this.publisher = publisher; + } + + @Override + public void setProcessor(WaitingWorkProcessor workProcessor) { +// this.workProcessor = workProcessor; + } + + @Override + public void onEvent(long sequence, MessageHolder event) throws Exception { + MessageType messageType = event.messageType; + switch (messageType) { + case ONE: { + Object message1 = event.message1; +// System.err.println("(" + sequence + ")" + message1); +// this.workProcessor.release(sequence); + this.publisher.publish(message1); + return; + } + case TWO: { + Object message1 = event.message1; + Object message2 = event.message2; + this.publisher.publish(message1, message2); + return; + } + case THREE: { + Object message1 = event.message1; + Object message2 = event.message2; + Object message3 = event.message3; + this.publisher.publish(message1, message2, message3); + return; + } + case ARRAY: { + Object[] messages = event.messages; + this.publisher.publish(messages); + return; + } + } + } +} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/subscription/MessageHolder.java b/src/main/java/com/lmax/disruptor/MessageHolder.java similarity index 64% rename from src/main/java/dorkbox/util/messagebus/subscription/MessageHolder.java rename to src/main/java/com/lmax/disruptor/MessageHolder.java index 4e561c3..3932f51 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/MessageHolder.java +++ b/src/main/java/com/lmax/disruptor/MessageHolder.java @@ -1,9 +1,8 @@ -package dorkbox.util.messagebus.subscription; +package com.lmax.disruptor; /** - * @author dorkbox, llc - * Date: 2/2/15 + * @author dorkbox, llc Date: 2/2/15 */ public class MessageHolder { public MessageType messageType = MessageType.ONE; @@ -13,6 +12,5 @@ public class MessageHolder { public Object message3 = null; public Object[] messages = null; - public MessageHolder() { - } -} \ No newline at end of file + public MessageHolder() {} +} diff --git a/src/main/java/dorkbox/util/messagebus/subscription/MessageType.java b/src/main/java/com/lmax/disruptor/MessageType.java similarity index 70% rename from src/main/java/dorkbox/util/messagebus/subscription/MessageType.java rename to src/main/java/com/lmax/disruptor/MessageType.java index 657794b..dda4777 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/MessageType.java +++ b/src/main/java/com/lmax/disruptor/MessageType.java @@ -1,4 +1,4 @@ -package dorkbox.util.messagebus.subscription; +package com.lmax.disruptor; /** * @author dorkbox, llc * Date: 2/2/15 diff --git a/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java b/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java new file mode 100644 index 0000000..f1fdb42 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java @@ -0,0 +1,35 @@ +package com.lmax.disruptor; + +import com.lmax.disruptor.ExceptionHandler; + +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.error.PublicationError; + +public final class PublicationExceptionHandler implements ExceptionHandler { + private final ErrorHandlingSupport errorHandler; + + public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) { + this.errorHandler = errorHandler; + } + + @Override + public void handleEventException(final Throwable e, final long sequence, final Object event) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")") + .setCause(e)); + } + + @Override + public void handleOnStartException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error starting the disruptor") + .setCause(e)); + } + + @Override + public void handleOnShutdownException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error stopping the disruptor") + .setCause(e)); + } +} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java b/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java new file mode 100644 index 0000000..4b5413b --- /dev/null +++ b/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java @@ -0,0 +1,371 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; + +import sun.misc.Unsafe; + +import com.lmax.disruptor.util.Util; + + +/** + *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. + * Suitable for use for sequencing across multiple publisher threads.

+ * + *

*

Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call + * to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then + * {@link Sequencer#getHighestPublishedSequence(long, long)} should be used. + */ +public final class WaitingMultiProducerSequencer extends AbstractSequencer implements WaitStrategy +{ + private static final Unsafe UNSAFE = Util.getUnsafe(); + private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); + private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); + + private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + private final Lock lock = new ReentrantLock(); + private final Condition processorNotifyCondition = this.lock.newCondition(); + + private final AtomicBoolean running = new AtomicBoolean(true); + private final ExceptionHandler exceptionHandler; + + // availableBuffer tracks the state of each ringbuffer slot + // see below for more details on the approach + private final int[] availableBuffer; + private final int indexMask; + private final int indexShift; + + /** + * Construct a Sequencer with the selected wait strategy and buffer size. + * + * @param bufferSize the size of the buffer that this will sequence over. + * @param producerWaitStrategy for those producing sequences + * @param consumerWaitStrategy for those waiting on sequences. + */ + public WaitingMultiProducerSequencer(int bufferSize, ExceptionHandler exceptionHandler, WaitStrategy consumerWaitStrategy) + { + super(bufferSize, consumerWaitStrategy); + this.exceptionHandler = exceptionHandler; + + this.availableBuffer = new int[bufferSize]; + this.indexMask = bufferSize - 1; + this.indexShift = Util.log2(bufferSize); + initialiseAvailableBuffer(); + } + + public void halt() { + this.running.set(false); + signalAllWhenBlocking(); + } + + /** + * @see Sequencer#hasAvailableCapacity(int) + */ + @Override + public boolean hasAvailableCapacity(final int requiredCapacity) + { + return hasAvailableCapacity(this.gatingSequences, requiredCapacity, this.cursor.get()); + } + + private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) + { + long wrapPoint = cursorValue + requiredCapacity - this.bufferSize; + long cachedGatingSequence = this.gatingSequenceCache.get(); + + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) + { + long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); + this.gatingSequenceCache.set(minSequence); + + if (wrapPoint > minSequence) + { + return false; + } + } + + return true; + } + + /** + * @see Sequencer#claim(long) + */ + @Override + public void claim(long sequence) + { + this.cursor.set(sequence); + } + + /** + * @see Sequencer#next() + */ + @Override + public long next() + { + return next(1); + } + + /** + * @see Sequencer#next(int) + */ + @Override + public long next(int n) + { + if (n < 1) + { + throw new IllegalArgumentException("n must be > 0"); + } + + long current; + long next; + + do + { + current = this.cursor.get(); + next = current + n; + + long wrapPoint = next - this.bufferSize; + long cachedGatingSequence = this.gatingSequenceCache.get(); + + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) + { + long gatingSequence = Util.getMinimumSequence(this.gatingSequences, current); + + if (wrapPoint > gatingSequence) + { +// try { +// waitFor(gatingSequence+1, this.cursor, null, null); // as soon as a spot frees by a consumer, continue +// } +// catch (final AlertException ex) +// { +// if (!this.running.get()) +// { +// break; +// } +// } catch (final Throwable ex) +// { +// this.exceptionHandler.handleEventException(ex, next, null); +// break; +// } + + LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? + continue; + } + + this.gatingSequenceCache.set(gatingSequence); + } + else if (this.cursor.compareAndSet(current, next)) + { + break; + } + } + while (true); + + return next; + } + + /** + * @see Sequencer#tryNext() + */ + @Override + public long tryNext() throws InsufficientCapacityException + { + return tryNext(1); + } + + /** + * @see Sequencer#tryNext(int) + */ + @Override + public long tryNext(int n) throws InsufficientCapacityException + { + if (n < 1) + { + throw new IllegalArgumentException("n must be > 0"); + } + + long current; + long next; + + do + { + current = this.cursor.get(); + next = current + n; + + if (!hasAvailableCapacity(this.gatingSequences, n, current)) + { + throw InsufficientCapacityException.INSTANCE; + } + } + while (!this.cursor.compareAndSet(current, next)); + + return next; + } + + /** + * @see Sequencer#remainingCapacity() + */ + @Override + public long remainingCapacity() + { + long consumed = Util.getMinimumSequence(this.gatingSequences, this.cursor.get()); + long produced = this.cursor.get(); + return getBufferSize() - (produced - consumed); + } + + private void initialiseAvailableBuffer() + { + for (int i = this.availableBuffer.length - 1; i != 0; i--) + { + setAvailableBufferValue(i, -1); + } + + setAvailableBufferValue(0, -1); + } + + /** + * @see Sequencer#publish(long) + */ + @Override + public void publish(final long sequence) + { + setAvailable(sequence); + this.waitStrategy.signalAllWhenBlocking(); + } + + /** + * @see Sequencer#publish(long, long) + */ + @Override + public void publish(long lo, long hi) + { + for (long l = lo; l <= hi; l++) + { + setAvailable(l); + } + this.waitStrategy.signalAllWhenBlocking(); + } + + /** + * The below methods work on the availableBuffer flag. + * + * The prime reason is to avoid a shared sequence object between publisher threads. + * (Keeping single pointers tracking start and end would require coordination + * between the threads). + * + * -- Firstly we have the constraint that the delta between the cursor and minimum + * gating sequence will never be larger than the buffer size (the code in + * next/tryNext in the Sequence takes care of that). + * -- Given that; take the sequence value and mask off the lower portion of the + * sequence as the index into the buffer (indexMask). (aka modulo operator) + * -- The upper portion of the sequence becomes the value to check for availability. + * ie: it tells us how many times around the ring buffer we've been (aka division) + * -- Because we can't wrap without the gating sequences moving forward (i.e. the + * minimum gating sequence is effectively our last available position in the + * buffer), when we have new data and successfully claimed a slot we can simply + * write over the top. + */ + private void setAvailable(final long sequence) + { + setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); + } + + private void setAvailableBufferValue(int index, int flag) + { + long bufferAddress = index * SCALE + BASE; + UNSAFE.putOrderedInt(this.availableBuffer, bufferAddress, flag); + } + + /** + * @see Sequencer#isAvailable(long) + */ + @Override + public boolean isAvailable(long sequence) + { + int index = calculateIndex(sequence); + int flag = calculateAvailabilityFlag(sequence); + long bufferAddress = index * SCALE + BASE; + return UNSAFE.getIntVolatile(this.availableBuffer, bufferAddress) == flag; + } + + @Override + public long getHighestPublishedSequence(long lowerBound, long availableSequence) + { + for (long sequence = lowerBound; sequence <= availableSequence; sequence++) + { + if (!isAvailable(sequence)) + { + return sequence - 1; + } + } + + return availableSequence; + } + + private int calculateAvailabilityFlag(final long sequence) + { + return (int) (sequence >>> this.indexShift); + } + + private int calculateIndex(final long sequence) + { + return (int) sequence & this.indexMask; + } + + @Override + public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) + throws AlertException, InterruptedException + { + long availableSequence; + + if ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence) + { + this.lock.lock(); + try + { + while ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence && this.running.get()) + { + this.processorNotifyCondition.await(); + } + } + finally + { + this.lock.unlock(); + } + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() + { + this.lock.lock(); + try + { + this.processorNotifyCondition.signalAll(); + } + finally + { + this.lock.unlock(); + } + } +} diff --git a/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java b/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java new file mode 100644 index 0000000..1da5680 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java @@ -0,0 +1,208 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + *

A {@link WaitingWorkProcessor} wraps a single {@link WorkHandler}, effectively consuming the sequence + * and ensuring appropriate barriers.

+ * + *

Generally, this will be used as part of a {@link WaitingWorkerPool}.

+ * + * @param event implementation storing the details for the work to processed. + */ +public final class WaitingWorkProcessor + implements EventProcessor +{ + private final AtomicBoolean running = new AtomicBoolean(false); + private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + private final RingBuffer ringBuffer; + private final SequenceBarrier sequenceBarrier; + private final WorkHandlerEarlyRelease workHandler; + private final ExceptionHandler exceptionHandler; + private final Sequence workSequence; + + private final EventReleaser eventReleaser = new EventReleaser() + { + @Override + public void release() + { + WaitingWorkProcessor.this.sequence.set(Long.MAX_VALUE); + } + }; + private WaitStrategy publisherStrategy; + + /** + * Construct a {@link WaitingWorkProcessor}. + * + * @param ringBuffer to which events are published. + * @param sequenceBarrier on which it is waiting. + * @param sequenceBarrier + * @param workHandler is the delegate to which events are dispatched. + * @param exceptionHandler to be called back when an error occurs + * @param workSequence from which to claim the next event to be worked on. It should always be initialised + * as {@link Sequencer#INITIAL_CURSOR_VALUE} + */ + public WaitingWorkProcessor(final RingBuffer ringBuffer, + final WaitStrategy publisherStrategy, + final SequenceBarrier sequenceBarrier, + final WorkHandlerEarlyRelease workHandler, + final ExceptionHandler exceptionHandler, + final Sequence workSequence) + { + this.ringBuffer = ringBuffer; + this.publisherStrategy = publisherStrategy; + this.sequenceBarrier = sequenceBarrier; + this.workHandler = workHandler; + this.exceptionHandler = exceptionHandler; + this.workSequence = workSequence; + + if (this.workHandler instanceof EventReleaseAware) + { + ((EventReleaseAware)this.workHandler).setEventReleaser(this.eventReleaser); + } + + workHandler.setProcessor(this); + } + + @Override + public Sequence getSequence() + { + return this.sequence; + } + + @Override + public void halt() + { + this.running.set(false); + this.sequenceBarrier.alert(); + } + + @Override + public boolean isRunning() + { + return this.running.get(); + } + + /** + * It is ok to have another thread re-run this method after a halt(). + * + * @throws IllegalStateException if this processor is already running + */ + @Override + public void run() + { + if (!this.running.compareAndSet(false, true)) + { + throw new IllegalStateException("Thread is already running"); + } + this.sequenceBarrier.clearAlert(); + + notifyStart(); + + WaitStrategy publisherStrategy = this.publisherStrategy; + boolean processedSequence = true; + long cachedAvailableSequence = Long.MIN_VALUE; + long nextSequence = this.sequence.get(); + T event = null; + while (true) + { + try + { + // if previous sequence was processed - fetch the next sequence and set + // that we have successfully processed the previous sequence + // typically, this will be true + // this prevents the sequence getting too far forward if an exception + // is thrown from the WorkHandler + if (processedSequence) + { + processedSequence = false; + do + { + nextSequence = this.workSequence.get() + 1L; + // this tells the producer that we are done with our sequence, and that it can reuse it's spot in the ring buffer + this.sequence.set(nextSequence - 1L); + } + while (!this.workSequence.compareAndSet(nextSequence - 1L, nextSequence)); +// publisherStrategy.signalAllWhenBlocking(); + } + + if (cachedAvailableSequence >= nextSequence) + { + event = this.ringBuffer.get(nextSequence); + this.workHandler.onEvent(nextSequence, event); + processedSequence = true; + } + else + { + cachedAvailableSequence = this.sequenceBarrier.waitFor(nextSequence); + } + } + catch (final AlertException ex) + { + if (!this.running.get()) + { + break; + } + } + catch (final Throwable ex) + { + // handle, mark as processed, unless the exception handler threw an exception + this.exceptionHandler.handleEventException(ex, nextSequence, event); + processedSequence = true; + } + } + + notifyShutdown(); + + this.running.set(false); + } + + private void notifyStart() + { + if (this.workHandler instanceof LifecycleAware) + { + try + { + ((LifecycleAware)this.workHandler).onStart(); + } + catch (final Throwable ex) + { + this.exceptionHandler.handleOnStartException(ex); + } + } + } + + private void notifyShutdown() + { + if (this.workHandler instanceof LifecycleAware) + { + try + { + ((LifecycleAware)this.workHandler).onShutdown(); + } + catch (final Throwable ex) + { + this.exceptionHandler.handleOnShutdownException(ex); + } + } + } + + public void release(long sequence) { + + } +} diff --git a/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java b/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java new file mode 100644 index 0000000..9568aa4 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java @@ -0,0 +1,150 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.lmax.disruptor.util.Util; + +/** + * WorkerPool contains a pool of {@link WaitingWorkProcessor}s that will consume sequences so jobs can be farmed out across a pool of workers. + * Each of the {@link WaitingWorkProcessor}s manage and calls a {@link WorkHandler} to process the events. + * + * @param event to be processed by a pool of workers + */ +public final class WaitingWorkerPool +{ + private final AtomicBoolean started = new AtomicBoolean(false); + private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + private final RingBuffer ringBuffer; + // WorkProcessors are created to wrap each of the provided WorkHandlers + private final WaitingWorkProcessor[] workProcessors; + + /** + * Create a worker pool to enable an array of {@link WorkHandler}s to consume published sequences. + * + * This option requires a pre-configured {@link RingBuffer} which must have {@link RingBuffer#addGatingSequences(Sequence...)} + * called before the work pool is started. + * + * @param ringBuffer of events to be consumed. + * @param sequenceBarrier on which the workers will depend. + * @param sequenceBarrier2 + * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. + * @param workHandlers to distribute the work load across. + */ + public WaitingWorkerPool(final RingBuffer ringBuffer, + final WaitStrategy publisherStrategy, + final SequenceBarrier sequenceBarrier, + final ExceptionHandler exceptionHandler, + final WorkHandlerEarlyRelease... workHandlers) + { + this.ringBuffer = ringBuffer; + final int numWorkers = workHandlers.length; + this.workProcessors = new WaitingWorkProcessor[numWorkers]; + + for (int i = 0; i < numWorkers; i++) + { + this.workProcessors[i] = new WaitingWorkProcessor(ringBuffer, + publisherStrategy, + sequenceBarrier, + workHandlers[i], + exceptionHandler, + this.workSequence); + } + } + + /** + * Get an array of {@link Sequence}s representing the progress of the workers. + * + * @return an array of {@link Sequence}s representing the progress of the workers. + */ + public Sequence[] getWorkerSequences() + { +// final Sequence[] sequences = new Sequence[this.workProcessors.length + 1]; + final Sequence[] sequences = new Sequence[this.workProcessors.length]; + for (int i = 0, size = this.workProcessors.length; i < size; i++) + { + sequences[i] = this.workProcessors[i].getSequence(); + } +// sequences[sequences.length - 1] = this.workSequence; + + return sequences; + } + + /** + * Start the worker pool processing events in sequence. + * + * @param executor providing threads for running the workers. + * @return the {@link RingBuffer} used for the work queue. + * @throws IllegalStateException if the pool has already been started and not halted yet + */ + public RingBuffer start(final Executor executor) + { + if (!this.started.compareAndSet(false, true)) + { + throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted."); + } + + final long cursor = this.ringBuffer.getCursor(); + this.workSequence.set(cursor); + + for (WaitingWorkProcessor processor : this.workProcessors) + { + processor.getSequence().set(cursor); + executor.execute(processor); + } + + return this.ringBuffer; + } + + /** + * Wait for the {@link RingBuffer} to drain of published events then halt the workers. + */ + public void drainAndHalt() + { + Sequence[] workerSequences = getWorkerSequences(); + while (this.ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) + { + Thread.yield(); + } + + for (WaitingWorkProcessor processor : this.workProcessors) + { + processor.halt(); + } + + this.started.set(false); + } + + /** + * Halt all workers immediately at the end of their current cycle. + */ + public void halt() + { + for (WaitingWorkProcessor processor : this.workProcessors) + { + processor.halt(); + } + + this.started.set(false); + } + + public boolean isRunning() + { + return this.started.get(); + } +} diff --git a/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java b/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java new file mode 100644 index 0000000..21492e0 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +import com.lmax.disruptor.RingBuffer; + +/** + * Callback interface to be implemented for processing units of work as they become available in the {@link RingBuffer}. + * + * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. + * @see WaitingWorkerPool + */ +public interface WorkHandlerEarlyRelease +{ + /** + * Callback to indicate a unit of work needs to be processed. + * + * @param event published to the {@link RingBuffer} + * @throws Exception if the {@link WorkHandlerEarlyRelease} would like the exception handled further up the chain. + */ + void onEvent(long sequence, T event) throws Exception; + + void setProcessor(WaitingWorkProcessor workProcessor); +} diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 2e74c03..28331fb 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -6,10 +6,11 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import com.lmax.disruptor.MessageHolder; + import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.LinkedTransferQueue; import dorkbox.util.messagebus.common.NamedThreadFactory; -import dorkbox.util.messagebus.common.TransferQueue; +import dorkbox.util.messagebus.common.simpleq.SimpleQueue; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -40,7 +41,9 @@ public class MultiMBassador implements IMessageBus { // this handler will receive all errors that occur during message dispatch or message handling private final Collection errorHandlers = new ArrayDeque(); - private final TransferQueue dispatchQueue = new LinkedTransferQueue(); +// private final TransferQueue dispatchQueue = new LinkedTransferQueue(); +// private final DisruptorQueue dispatchQueue; + private final SimpleQueue dispatchQueue; private final SubscriptionManager subscriptionManager; @@ -61,7 +64,7 @@ public class MultiMBassador implements IMessageBus { */ public MultiMBassador() { this(Runtime.getRuntime().availableProcessors()); -// this(4); +// this(2); } /** @@ -84,6 +87,9 @@ public class MultiMBassador implements IMessageBus { } this.numberOfThreads = numberOfThreads; +// this.dispatchQueue = new DisruptorQueue(this, numberOfThreads); + this.dispatchQueue = new SimpleQueue(numberOfThreads); + this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.threads = new ArrayDeque(numberOfThreads); @@ -94,8 +100,10 @@ public class MultiMBassador implements IMessageBus { @SuppressWarnings("null") @Override public void run() { - TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; - Runnable event = null; +// TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; + SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; +// Runnable event = null; + MessageHolder event = null; int spins; while (true) { @@ -113,7 +121,11 @@ public class MultiMBassador implements IMessageBus { } } - event.run(); + Object message1 = event.message1; + IN_QUEUE.release(event); +// event.run(); + publish(message1); + } catch (InterruptedException e) { return; } @@ -125,8 +137,6 @@ public class MultiMBassador implements IMessageBus { this.threads.add(runner); runner.start(); } - - } @Override @@ -157,7 +167,8 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { - return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads; +// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads; + return this.dispatchQueue.hasPendingMessages(); } @Override @@ -396,15 +407,16 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(final Object message) { if (message != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message); +// } +// }; try { - this.dispatchQueue.transfer(runnable); +// this.dispatchQueue.transfer(runnable); + this.dispatchQueue.transfer(message); } catch (InterruptedException e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java new file mode 100644 index 0000000..f21e018 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -0,0 +1,109 @@ +package dorkbox.util.messagebus.common.simpleq; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.lmax.disruptor.MessageHolder; + + + +public class SimpleQueue { + + + private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); + + private final Lock publisherLock = new ReentrantLock(); + private final Condition publisherNotifyCondition = this.publisherLock.newCondition(); + + private final Lock consumerLock = new ReentrantLock(); + private final Condition consumerNotifyCondition = this.consumerLock.newCondition(); + + private final AtomicReference consumer = new AtomicReference(); + private final AtomicReference producer = new AtomicReference(); + + private volatile boolean waitingProducer = false; + private final AtomicInteger availableThreads = new AtomicInteger(); + + + public SimpleQueue(int numberOfThreads) { + this.availableThreads.set(numberOfThreads); + this.producer.set(new MessageHolder()); + } + + public void transfer(Object message1) throws InterruptedException { + MessageHolder holder = null; + + if ((holder = this.producer.getAndSet(null)) == null) { + this.publisherLock.lock(); + try { + while ((holder = this.producer.getAndSet(null)) == null) { + this.waitingProducer = true; + this.publisherNotifyCondition.await(); + } + } finally { + this.publisherLock.unlock(); + } + } + + holder.message1 = message1; + this.consumer.set(holder); + + this.consumerLock.lock(); + try { + this.consumerNotifyCondition.signalAll(); + } finally { + this.consumerLock.unlock(); + } + } + + public boolean hasPendingMessages() { + return false; + } + + public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + + } + + public MessageHolder poll() { + return this.consumer.getAndSet(null); + } + + public MessageHolder take() throws InterruptedException { + MessageHolder holder = null; + + this.consumerLock.lock(); + try { + while ((holder = this.consumer.getAndSet(null)) == null) { + this.consumerNotifyCondition.await(); + } + } finally { + this.consumerLock.unlock(); + } + + + return holder; + } + + // release the event back to the publisher + // notify publisher in case pub was waiting + public void release(MessageHolder holder) { + this.producer.set(holder); + + if (this.waitingProducer) { + this.publisherLock.lock(); + try { + if (this.waitingProducer) { + this.waitingProducer = false; + this.publisherNotifyCondition.signalAll(); + } + } finally { + this.publisherLock.unlock(); + } + } + } +} diff --git a/src/main/java/dorkbox/util/messagebus/subscription/EventProcessor.java b/src/main/java/dorkbox/util/messagebus/subscription/EventProcessor.java deleted file mode 100644 index e45cb71..0000000 --- a/src/main/java/dorkbox/util/messagebus/subscription/EventProcessor.java +++ /dev/null @@ -1,47 +0,0 @@ -package dorkbox.util.messagebus.subscription; - -import com.lmax.disruptor.WorkHandler; - -import dorkbox.util.messagebus.PubSubSupport; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class EventProcessor implements WorkHandler { - private final PubSubSupport publisher; - - public EventProcessor(PubSubSupport publisher) { - this.publisher = publisher; - } - - @Override - public void onEvent(MessageHolder event) throws Exception { - MessageType messageType = event.messageType; - switch (messageType) { - case ONE: { - this.publisher.publish(event.message1); - event.message1 = null; // cleanup - return; - } - case TWO: { - this.publisher.publish(event.message1, event.message2); - event.message1 = null; // cleanup - event.message2 = null; // cleanup - return; - } - case THREE: { - this.publisher.publish(event.message1, event.message2, event.message3); - event.message1 = null; // cleanup - event.message2 = null; // cleanup - event.message3 = null; // cleanup - return; - } - case ARRAY: { - this.publisher.publish(event.messages); - event.messages = null; // cleanup - return; - } - } - } -} \ No newline at end of file diff --git a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java index 31695aa..51a024f 100644 --- a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java @@ -3,11 +3,10 @@ */ package dorkbox.util.messagebus; -import dorkbox.util.messagebus.MultiMBassador; +import junit.framework.Assert; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; -import junit.framework.Assert; /** * @author dorkbox, llc Date: 2/2/15 @@ -41,9 +40,10 @@ public class PerformanceTest { Listener listener1 = new Listener(); bus.subscribe(listener1); - long num = Long.MAX_VALUE; - while (num-- > 0) { - bus.publishAsync("s"); + + long num = 0; + while (num < Long.MAX_VALUE) { + bus.publishAsync(num++); } // bus.publish("s", "s"); @@ -60,11 +60,16 @@ public class PerformanceTest { @SuppressWarnings("unused") public static class Listener { @Handler - public void handleSync(String o1) { + public void handleSync(Long o1) { count++; -// System.err.println("match String"); } +// @Handler +// public void handleSync(String o1) { +// count++; +//// System.err.println("match String"); +// } + // @Handler // public void handleSync(String o1, String o2) { // count.getAndIncrement();