From 1a591165a869957c4d2e88714298133ef7ba5132 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 6 May 2015 20:47:02 +0200 Subject: [PATCH] WIP - pre-lib for jctools --- .../com/lmax/disruptor/DisruptorQueue.java | 104 -- .../com/lmax/disruptor/EventBusFactory.java | 18 - .../com/lmax/disruptor/EventProcessor2.java | 53 - .../com/lmax/disruptor/MessageHolder.java | 16 - .../com/lmax/disruptor/MessageTypeOLD.java | 8 - .../PublicationExceptionHandler.java | 35 - .../WaitingMultiProducerSequencer.java | 371 ------- .../lmax/disruptor/WaitingWorkProcessor.java | 208 ---- .../com/lmax/disruptor/WaitingWorkerPool.java | 150 --- .../disruptor/WorkHandlerEarlyRelease.java | 37 - .../util/messagebus/MultiMBassador.java | 82 +- .../messagebus/common/ReflectionUtils.java | 7 +- .../common/simpleq}/MessageType.java | 2 +- .../jctools/MpmcArrayTransferQueue.java | 226 ---- .../jctools/MpmcTransferArrayQueue.java | 968 ++++++++++++++++++ .../common/simpleq/jctools/Node.java | 3 +- .../common/simpleq/jctools/SimpleQueue.java | 525 ---------- ...nsferQueueConcurrentNonBlockPerfTest.java} | 42 +- .../util/messagebus/MpmcNonBlockPerfTest.java | 167 +++ .../util/messagebus/MpmcQueueAltPerfTest.java | 103 -- ...erfTest.java => MpmcTransferPerfTest.java} | 22 +- 21 files changed, 1214 insertions(+), 1933 deletions(-) delete mode 100644 src/main/java/com/lmax/disruptor/DisruptorQueue.java delete mode 100644 src/main/java/com/lmax/disruptor/EventBusFactory.java delete mode 100644 src/main/java/com/lmax/disruptor/EventProcessor2.java delete mode 100644 src/main/java/com/lmax/disruptor/MessageHolder.java delete mode 100644 src/main/java/com/lmax/disruptor/MessageTypeOLD.java delete mode 100644 src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java delete mode 100644 src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java delete mode 100644 src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java delete mode 100644 src/main/java/com/lmax/disruptor/WaitingWorkerPool.java delete mode 100644 src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java rename src/main/java/{com/lmax/disruptor => dorkbox/util/messagebus/common/simpleq}/MessageType.java (83%) delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java create mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java rename src/test/java/dorkbox/util/messagebus/{MpmcQueueAltConcurrentPerfTest.java => LinkTransferQueueConcurrentNonBlockPerfTest.java} (76%) create mode 100644 src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java delete mode 100644 src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java rename src/test/java/dorkbox/util/messagebus/{SimpleQueueAltPerfTest.java => MpmcTransferPerfTest.java} (86%) diff --git a/src/main/java/com/lmax/disruptor/DisruptorQueue.java b/src/main/java/com/lmax/disruptor/DisruptorQueue.java deleted file mode 100644 index 48044d0..0000000 --- a/src/main/java/com/lmax/disruptor/DisruptorQueue.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.lmax.disruptor; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import dorkbox.util.messagebus.MultiMBassador; -import dorkbox.util.messagebus.common.NamedThreadFactory; - - - -public class DisruptorQueue { - - - private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); - - private final ExecutorService executor; - - // must be power of 2. - private final int ringBufferSize = 1; - - private final RingBuffer ringBuffer; - private WaitingWorkerPool workerPool; - - public DisruptorQueue(MultiMBassador mbassador, int numberOfThreads) { -// numberOfThreads = 4; - - // only used to startup threads, can be replaced with static list of threads - this.executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0, - TimeUnit.NANOSECONDS, - new LinkedTransferQueue(), - new NamedThreadFactory("disruptor")); - - - - - EventBusFactory factory = new EventBusFactory(); - PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(mbassador); - - WorkHandlerEarlyRelease handlers[] = new EventProcessor2[numberOfThreads]; - for (int i = 0; i < handlers.length; i++) { - handlers[i] = new EventProcessor2(mbassador); - } - - WaitStrategy consumerWaitStrategy = new BlockingWaitStrategy(); - WaitingMultiProducerSequencer sequencer = new WaitingMultiProducerSequencer(4, loggingExceptionHandler, consumerWaitStrategy); - this.ringBuffer = new RingBuffer(factory, sequencer); - - - SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier(); - this.workerPool = new WaitingWorkerPool(this.ringBuffer, - sequencer, - sequenceBarrier, - loggingExceptionHandler, - handlers); - - sequencer.addGatingSequences(this.workerPool.getWorkerSequences()); // to notify our consumers (if they are blocking) of a new element - this.workerPool.start(this.executor); - } - - public void transfer(Object message1) throws InterruptedException { - // put this on the disruptor ring buffer - final RingBuffer ringBuffer = this.ringBuffer; - - // setup the job - final long seq = ringBuffer.next(); - try { -// System.err.println("+(" + seq + ") " + message1); - MessageHolder eventJob = ringBuffer.get(seq); - eventJob.messageType = MessageTypeOLD.ONE; - eventJob.message1 = message1; -// eventJob.message2 = message2; -// eventJob.message3 = message3; - } catch (Exception e) { - e.printStackTrace(); -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message1)); - } finally { - // always publish the job - ringBuffer.publish(seq); - } - } - - public boolean hasPendingMessages() { - final long cursor = this.ringBuffer.getCursor(); - Sequence[] workerSequences = this.workerPool.getWorkerSequences(); - for (Sequence s : workerSequences) { - if (cursor > s.get()) - { - return true; - } - } - - return false; - } - - public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - - } -} diff --git a/src/main/java/com/lmax/disruptor/EventBusFactory.java b/src/main/java/com/lmax/disruptor/EventBusFactory.java deleted file mode 100644 index c002ab7..0000000 --- a/src/main/java/com/lmax/disruptor/EventBusFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.lmax.disruptor; - -import com.lmax.disruptor.EventFactory; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class EventBusFactory implements EventFactory { - - public EventBusFactory() { - } - - @Override - public MessageHolder newInstance() { - return new MessageHolder(); - } -} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/EventProcessor2.java b/src/main/java/com/lmax/disruptor/EventProcessor2.java deleted file mode 100644 index 5c3821e..0000000 --- a/src/main/java/com/lmax/disruptor/EventProcessor2.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.lmax.disruptor; - -import dorkbox.util.messagebus.PubSubSupport; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class EventProcessor2 implements WorkHandlerEarlyRelease { - private final PubSubSupport publisher; - private WaitingWorkProcessor workProcessor; - - public EventProcessor2(PubSubSupport publisher) { - this.publisher = publisher; - } - - @Override - public void setProcessor(WaitingWorkProcessor workProcessor) { -// this.workProcessor = workProcessor; - } - - @Override - public void onEvent(long sequence, MessageHolder event) throws Exception { - MessageTypeOLD messageType = event.messageType; - switch (messageType) { - case ONE: { - Object message1 = event.message1; -// System.err.println("(" + sequence + ")" + message1); -// this.workProcessor.release(sequence); - this.publisher.publish(message1); - return; - } - case TWO: { - Object message1 = event.message1; - Object message2 = event.message2; - this.publisher.publish(message1, message2); - return; - } - case THREE: { - Object message1 = event.message1; - Object message2 = event.message2; - Object message3 = event.message3; - this.publisher.publish(message1, message2, message3); - return; - } - case ARRAY: { - Object[] messages = event.messages; - this.publisher.publish(messages); - return; - } - } - } -} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/MessageHolder.java b/src/main/java/com/lmax/disruptor/MessageHolder.java deleted file mode 100644 index 63f6eb2..0000000 --- a/src/main/java/com/lmax/disruptor/MessageHolder.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.lmax.disruptor; - - -/** - * @author dorkbox, llc Date: 2/2/15 - */ -public class MessageHolder { - public MessageTypeOLD messageType = MessageTypeOLD.ONE; - - public Object message1 = null; - public Object message2 = null; - public Object message3 = null; - public Object[] messages = null; - - public MessageHolder() {} -} diff --git a/src/main/java/com/lmax/disruptor/MessageTypeOLD.java b/src/main/java/com/lmax/disruptor/MessageTypeOLD.java deleted file mode 100644 index be7c410..0000000 --- a/src/main/java/com/lmax/disruptor/MessageTypeOLD.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.lmax.disruptor; -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public enum MessageTypeOLD { - ONE, TWO, THREE, ARRAY -} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java b/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java deleted file mode 100644 index f1fdb42..0000000 --- a/src/main/java/com/lmax/disruptor/PublicationExceptionHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.lmax.disruptor; - -import com.lmax.disruptor.ExceptionHandler; - -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.error.PublicationError; - -public final class PublicationExceptionHandler implements ExceptionHandler { - private final ErrorHandlingSupport errorHandler; - - public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) { - this.errorHandler = errorHandler; - } - - @Override - public void handleEventException(final Throwable e, final long sequence, final Object event) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")") - .setCause(e)); - } - - @Override - public void handleOnStartException(final Throwable e) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error starting the disruptor") - .setCause(e)); - } - - @Override - public void handleOnShutdownException(final Throwable e) { - this.errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error stopping the disruptor") - .setCause(e)); - } -} \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java b/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java deleted file mode 100644 index 4b5413b..0000000 --- a/src/main/java/com/lmax/disruptor/WaitingMultiProducerSequencer.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; - -import sun.misc.Unsafe; - -import com.lmax.disruptor.util.Util; - - -/** - *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. - * Suitable for use for sequencing across multiple publisher threads.

- * - *

*

Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call - * to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then - * {@link Sequencer#getHighestPublishedSequence(long, long)} should be used. - */ -public final class WaitingMultiProducerSequencer extends AbstractSequencer implements WaitStrategy -{ - private static final Unsafe UNSAFE = Util.getUnsafe(); - private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); - private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); - - private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - - private final Lock lock = new ReentrantLock(); - private final Condition processorNotifyCondition = this.lock.newCondition(); - - private final AtomicBoolean running = new AtomicBoolean(true); - private final ExceptionHandler exceptionHandler; - - // availableBuffer tracks the state of each ringbuffer slot - // see below for more details on the approach - private final int[] availableBuffer; - private final int indexMask; - private final int indexShift; - - /** - * Construct a Sequencer with the selected wait strategy and buffer size. - * - * @param bufferSize the size of the buffer that this will sequence over. - * @param producerWaitStrategy for those producing sequences - * @param consumerWaitStrategy for those waiting on sequences. - */ - public WaitingMultiProducerSequencer(int bufferSize, ExceptionHandler exceptionHandler, WaitStrategy consumerWaitStrategy) - { - super(bufferSize, consumerWaitStrategy); - this.exceptionHandler = exceptionHandler; - - this.availableBuffer = new int[bufferSize]; - this.indexMask = bufferSize - 1; - this.indexShift = Util.log2(bufferSize); - initialiseAvailableBuffer(); - } - - public void halt() { - this.running.set(false); - signalAllWhenBlocking(); - } - - /** - * @see Sequencer#hasAvailableCapacity(int) - */ - @Override - public boolean hasAvailableCapacity(final int requiredCapacity) - { - return hasAvailableCapacity(this.gatingSequences, requiredCapacity, this.cursor.get()); - } - - private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) - { - long wrapPoint = cursorValue + requiredCapacity - this.bufferSize; - long cachedGatingSequence = this.gatingSequenceCache.get(); - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) - { - long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); - this.gatingSequenceCache.set(minSequence); - - if (wrapPoint > minSequence) - { - return false; - } - } - - return true; - } - - /** - * @see Sequencer#claim(long) - */ - @Override - public void claim(long sequence) - { - this.cursor.set(sequence); - } - - /** - * @see Sequencer#next() - */ - @Override - public long next() - { - return next(1); - } - - /** - * @see Sequencer#next(int) - */ - @Override - public long next(int n) - { - if (n < 1) - { - throw new IllegalArgumentException("n must be > 0"); - } - - long current; - long next; - - do - { - current = this.cursor.get(); - next = current + n; - - long wrapPoint = next - this.bufferSize; - long cachedGatingSequence = this.gatingSequenceCache.get(); - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) - { - long gatingSequence = Util.getMinimumSequence(this.gatingSequences, current); - - if (wrapPoint > gatingSequence) - { -// try { -// waitFor(gatingSequence+1, this.cursor, null, null); // as soon as a spot frees by a consumer, continue -// } -// catch (final AlertException ex) -// { -// if (!this.running.get()) -// { -// break; -// } -// } catch (final Throwable ex) -// { -// this.exceptionHandler.handleEventException(ex, next, null); -// break; -// } - - LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? - continue; - } - - this.gatingSequenceCache.set(gatingSequence); - } - else if (this.cursor.compareAndSet(current, next)) - { - break; - } - } - while (true); - - return next; - } - - /** - * @see Sequencer#tryNext() - */ - @Override - public long tryNext() throws InsufficientCapacityException - { - return tryNext(1); - } - - /** - * @see Sequencer#tryNext(int) - */ - @Override - public long tryNext(int n) throws InsufficientCapacityException - { - if (n < 1) - { - throw new IllegalArgumentException("n must be > 0"); - } - - long current; - long next; - - do - { - current = this.cursor.get(); - next = current + n; - - if (!hasAvailableCapacity(this.gatingSequences, n, current)) - { - throw InsufficientCapacityException.INSTANCE; - } - } - while (!this.cursor.compareAndSet(current, next)); - - return next; - } - - /** - * @see Sequencer#remainingCapacity() - */ - @Override - public long remainingCapacity() - { - long consumed = Util.getMinimumSequence(this.gatingSequences, this.cursor.get()); - long produced = this.cursor.get(); - return getBufferSize() - (produced - consumed); - } - - private void initialiseAvailableBuffer() - { - for (int i = this.availableBuffer.length - 1; i != 0; i--) - { - setAvailableBufferValue(i, -1); - } - - setAvailableBufferValue(0, -1); - } - - /** - * @see Sequencer#publish(long) - */ - @Override - public void publish(final long sequence) - { - setAvailable(sequence); - this.waitStrategy.signalAllWhenBlocking(); - } - - /** - * @see Sequencer#publish(long, long) - */ - @Override - public void publish(long lo, long hi) - { - for (long l = lo; l <= hi; l++) - { - setAvailable(l); - } - this.waitStrategy.signalAllWhenBlocking(); - } - - /** - * The below methods work on the availableBuffer flag. - * - * The prime reason is to avoid a shared sequence object between publisher threads. - * (Keeping single pointers tracking start and end would require coordination - * between the threads). - * - * -- Firstly we have the constraint that the delta between the cursor and minimum - * gating sequence will never be larger than the buffer size (the code in - * next/tryNext in the Sequence takes care of that). - * -- Given that; take the sequence value and mask off the lower portion of the - * sequence as the index into the buffer (indexMask). (aka modulo operator) - * -- The upper portion of the sequence becomes the value to check for availability. - * ie: it tells us how many times around the ring buffer we've been (aka division) - * -- Because we can't wrap without the gating sequences moving forward (i.e. the - * minimum gating sequence is effectively our last available position in the - * buffer), when we have new data and successfully claimed a slot we can simply - * write over the top. - */ - private void setAvailable(final long sequence) - { - setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); - } - - private void setAvailableBufferValue(int index, int flag) - { - long bufferAddress = index * SCALE + BASE; - UNSAFE.putOrderedInt(this.availableBuffer, bufferAddress, flag); - } - - /** - * @see Sequencer#isAvailable(long) - */ - @Override - public boolean isAvailable(long sequence) - { - int index = calculateIndex(sequence); - int flag = calculateAvailabilityFlag(sequence); - long bufferAddress = index * SCALE + BASE; - return UNSAFE.getIntVolatile(this.availableBuffer, bufferAddress) == flag; - } - - @Override - public long getHighestPublishedSequence(long lowerBound, long availableSequence) - { - for (long sequence = lowerBound; sequence <= availableSequence; sequence++) - { - if (!isAvailable(sequence)) - { - return sequence - 1; - } - } - - return availableSequence; - } - - private int calculateAvailabilityFlag(final long sequence) - { - return (int) (sequence >>> this.indexShift); - } - - private int calculateIndex(final long sequence) - { - return (int) sequence & this.indexMask; - } - - @Override - public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) - throws AlertException, InterruptedException - { - long availableSequence; - - if ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence) - { - this.lock.lock(); - try - { - while ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence && this.running.get()) - { - this.processorNotifyCondition.await(); - } - } - finally - { - this.lock.unlock(); - } - } - - return availableSequence; - } - - @Override - public void signalAllWhenBlocking() - { - this.lock.lock(); - try - { - this.processorNotifyCondition.signalAll(); - } - finally - { - this.lock.unlock(); - } - } -} diff --git a/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java b/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java deleted file mode 100644 index 1da5680..0000000 --- a/src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - *

A {@link WaitingWorkProcessor} wraps a single {@link WorkHandler}, effectively consuming the sequence - * and ensuring appropriate barriers.

- * - *

Generally, this will be used as part of a {@link WaitingWorkerPool}.

- * - * @param event implementation storing the details for the work to processed. - */ -public final class WaitingWorkProcessor - implements EventProcessor -{ - private final AtomicBoolean running = new AtomicBoolean(false); - private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - private final RingBuffer ringBuffer; - private final SequenceBarrier sequenceBarrier; - private final WorkHandlerEarlyRelease workHandler; - private final ExceptionHandler exceptionHandler; - private final Sequence workSequence; - - private final EventReleaser eventReleaser = new EventReleaser() - { - @Override - public void release() - { - WaitingWorkProcessor.this.sequence.set(Long.MAX_VALUE); - } - }; - private WaitStrategy publisherStrategy; - - /** - * Construct a {@link WaitingWorkProcessor}. - * - * @param ringBuffer to which events are published. - * @param sequenceBarrier on which it is waiting. - * @param sequenceBarrier - * @param workHandler is the delegate to which events are dispatched. - * @param exceptionHandler to be called back when an error occurs - * @param workSequence from which to claim the next event to be worked on. It should always be initialised - * as {@link Sequencer#INITIAL_CURSOR_VALUE} - */ - public WaitingWorkProcessor(final RingBuffer ringBuffer, - final WaitStrategy publisherStrategy, - final SequenceBarrier sequenceBarrier, - final WorkHandlerEarlyRelease workHandler, - final ExceptionHandler exceptionHandler, - final Sequence workSequence) - { - this.ringBuffer = ringBuffer; - this.publisherStrategy = publisherStrategy; - this.sequenceBarrier = sequenceBarrier; - this.workHandler = workHandler; - this.exceptionHandler = exceptionHandler; - this.workSequence = workSequence; - - if (this.workHandler instanceof EventReleaseAware) - { - ((EventReleaseAware)this.workHandler).setEventReleaser(this.eventReleaser); - } - - workHandler.setProcessor(this); - } - - @Override - public Sequence getSequence() - { - return this.sequence; - } - - @Override - public void halt() - { - this.running.set(false); - this.sequenceBarrier.alert(); - } - - @Override - public boolean isRunning() - { - return this.running.get(); - } - - /** - * It is ok to have another thread re-run this method after a halt(). - * - * @throws IllegalStateException if this processor is already running - */ - @Override - public void run() - { - if (!this.running.compareAndSet(false, true)) - { - throw new IllegalStateException("Thread is already running"); - } - this.sequenceBarrier.clearAlert(); - - notifyStart(); - - WaitStrategy publisherStrategy = this.publisherStrategy; - boolean processedSequence = true; - long cachedAvailableSequence = Long.MIN_VALUE; - long nextSequence = this.sequence.get(); - T event = null; - while (true) - { - try - { - // if previous sequence was processed - fetch the next sequence and set - // that we have successfully processed the previous sequence - // typically, this will be true - // this prevents the sequence getting too far forward if an exception - // is thrown from the WorkHandler - if (processedSequence) - { - processedSequence = false; - do - { - nextSequence = this.workSequence.get() + 1L; - // this tells the producer that we are done with our sequence, and that it can reuse it's spot in the ring buffer - this.sequence.set(nextSequence - 1L); - } - while (!this.workSequence.compareAndSet(nextSequence - 1L, nextSequence)); -// publisherStrategy.signalAllWhenBlocking(); - } - - if (cachedAvailableSequence >= nextSequence) - { - event = this.ringBuffer.get(nextSequence); - this.workHandler.onEvent(nextSequence, event); - processedSequence = true; - } - else - { - cachedAvailableSequence = this.sequenceBarrier.waitFor(nextSequence); - } - } - catch (final AlertException ex) - { - if (!this.running.get()) - { - break; - } - } - catch (final Throwable ex) - { - // handle, mark as processed, unless the exception handler threw an exception - this.exceptionHandler.handleEventException(ex, nextSequence, event); - processedSequence = true; - } - } - - notifyShutdown(); - - this.running.set(false); - } - - private void notifyStart() - { - if (this.workHandler instanceof LifecycleAware) - { - try - { - ((LifecycleAware)this.workHandler).onStart(); - } - catch (final Throwable ex) - { - this.exceptionHandler.handleOnStartException(ex); - } - } - } - - private void notifyShutdown() - { - if (this.workHandler instanceof LifecycleAware) - { - try - { - ((LifecycleAware)this.workHandler).onShutdown(); - } - catch (final Throwable ex) - { - this.exceptionHandler.handleOnShutdownException(ex); - } - } - } - - public void release(long sequence) { - - } -} diff --git a/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java b/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java deleted file mode 100644 index 9568aa4..0000000 --- a/src/main/java/com/lmax/disruptor/WaitingWorkerPool.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.lmax.disruptor.util.Util; - -/** - * WorkerPool contains a pool of {@link WaitingWorkProcessor}s that will consume sequences so jobs can be farmed out across a pool of workers. - * Each of the {@link WaitingWorkProcessor}s manage and calls a {@link WorkHandler} to process the events. - * - * @param event to be processed by a pool of workers - */ -public final class WaitingWorkerPool -{ - private final AtomicBoolean started = new AtomicBoolean(false); - private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - private final RingBuffer ringBuffer; - // WorkProcessors are created to wrap each of the provided WorkHandlers - private final WaitingWorkProcessor[] workProcessors; - - /** - * Create a worker pool to enable an array of {@link WorkHandler}s to consume published sequences. - * - * This option requires a pre-configured {@link RingBuffer} which must have {@link RingBuffer#addGatingSequences(Sequence...)} - * called before the work pool is started. - * - * @param ringBuffer of events to be consumed. - * @param sequenceBarrier on which the workers will depend. - * @param sequenceBarrier2 - * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. - * @param workHandlers to distribute the work load across. - */ - public WaitingWorkerPool(final RingBuffer ringBuffer, - final WaitStrategy publisherStrategy, - final SequenceBarrier sequenceBarrier, - final ExceptionHandler exceptionHandler, - final WorkHandlerEarlyRelease... workHandlers) - { - this.ringBuffer = ringBuffer; - final int numWorkers = workHandlers.length; - this.workProcessors = new WaitingWorkProcessor[numWorkers]; - - for (int i = 0; i < numWorkers; i++) - { - this.workProcessors[i] = new WaitingWorkProcessor(ringBuffer, - publisherStrategy, - sequenceBarrier, - workHandlers[i], - exceptionHandler, - this.workSequence); - } - } - - /** - * Get an array of {@link Sequence}s representing the progress of the workers. - * - * @return an array of {@link Sequence}s representing the progress of the workers. - */ - public Sequence[] getWorkerSequences() - { -// final Sequence[] sequences = new Sequence[this.workProcessors.length + 1]; - final Sequence[] sequences = new Sequence[this.workProcessors.length]; - for (int i = 0, size = this.workProcessors.length; i < size; i++) - { - sequences[i] = this.workProcessors[i].getSequence(); - } -// sequences[sequences.length - 1] = this.workSequence; - - return sequences; - } - - /** - * Start the worker pool processing events in sequence. - * - * @param executor providing threads for running the workers. - * @return the {@link RingBuffer} used for the work queue. - * @throws IllegalStateException if the pool has already been started and not halted yet - */ - public RingBuffer start(final Executor executor) - { - if (!this.started.compareAndSet(false, true)) - { - throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted."); - } - - final long cursor = this.ringBuffer.getCursor(); - this.workSequence.set(cursor); - - for (WaitingWorkProcessor processor : this.workProcessors) - { - processor.getSequence().set(cursor); - executor.execute(processor); - } - - return this.ringBuffer; - } - - /** - * Wait for the {@link RingBuffer} to drain of published events then halt the workers. - */ - public void drainAndHalt() - { - Sequence[] workerSequences = getWorkerSequences(); - while (this.ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) - { - Thread.yield(); - } - - for (WaitingWorkProcessor processor : this.workProcessors) - { - processor.halt(); - } - - this.started.set(false); - } - - /** - * Halt all workers immediately at the end of their current cycle. - */ - public void halt() - { - for (WaitingWorkProcessor processor : this.workProcessors) - { - processor.halt(); - } - - this.started.set(false); - } - - public boolean isRunning() - { - return this.started.get(); - } -} diff --git a/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java b/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java deleted file mode 100644 index 21492e0..0000000 --- a/src/main/java/com/lmax/disruptor/WorkHandlerEarlyRelease.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import com.lmax.disruptor.RingBuffer; - -/** - * Callback interface to be implemented for processing units of work as they become available in the {@link RingBuffer}. - * - * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. - * @see WaitingWorkerPool - */ -public interface WorkHandlerEarlyRelease -{ - /** - * Callback to indicate a unit of work needs to be processed. - * - * @param event published to the {@link RingBuffer} - * @throws Exception if the {@link WorkHandlerEarlyRelease} would like the exception handled further up the chain. - */ - void onEvent(long sequence, T event) throws Exception; - - void setProcessor(WaitingWorkProcessor workProcessor); -} diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index f40646c..0f5156f 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -9,8 +9,8 @@ import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; -import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -29,12 +29,12 @@ public class MultiMBassador implements IMessageBus { private final Collection errorHandlers = new ArrayDeque(); // private final TransferQueue dispatchQueue; - private final SimpleQueue dispatchQueue; + private final MpmcTransferArrayQueue dispatchQueue; private final SubscriptionManager subscriptionManager; // all threads that are available for asynchronous message dispatching - private final int numberOfThreads; +// private final int numberOfThreads; private final Collection threads; /** @@ -72,10 +72,10 @@ public class MultiMBassador implements IMessageBus { } numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads); - this.numberOfThreads = numberOfThreads; +// this.numberOfThreads = numberOfThreads; // this.dispatchQueue = new LinkedTransferQueue(); - this.dispatchQueue = new SimpleQueue(numberOfThreads); + this.dispatchQueue = new MpmcTransferArrayQueue(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.threads = new ArrayDeque(numberOfThreads); @@ -86,7 +86,7 @@ public class MultiMBassador implements IMessageBus { Runnable runnable = new Runnable() { @Override public void run() { - SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; + MpmcTransferArrayQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; // TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; Object message1; @@ -417,15 +417,15 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(final Object message1, final Object message2) { if (message1 != null && message2 != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message1, message2); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message1, message2); +// } +// }; // try { - this.dispatchQueue.transfer(runnable); +// this.dispatchQueue.transfer(runnable); // } catch (InterruptedException e) { // handlePublicationError(new PublicationError() // .setMessage("Error while adding an asynchronous message") @@ -438,16 +438,16 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(final Object message1, final Object message2, final Object message3) { if (message1 != null || message2 != null | message3 != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message1, message2, message3); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message1, message2, message3); +// } +// }; // try { - this.dispatchQueue.transfer(runnable); +// this.dispatchQueue.transfer(runnable); // } catch (InterruptedException e) { // handlePublicationError(new PublicationError() // .setMessage("Error while adding an asynchronous message") @@ -460,15 +460,15 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(long timeout, TimeUnit unit, final Object message) { if (message != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message); +// } +// }; // try { - this.dispatchQueue.tryTransfer(runnable, timeout, unit); +// this.dispatchQueue.tryTransfer(runnable, timeout, unit); // } catch (InterruptedException e) { // handlePublicationError(new PublicationError() // .setMessage("Error while adding an asynchronous message") @@ -480,15 +480,15 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2) { if (message1 != null && message2 != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message1, message2); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message1, message2); +// } +// }; // try { - this.dispatchQueue.tryTransfer(runnable, timeout, unit); +// this.dispatchQueue.tryTransfer(runnable, timeout, unit); // } catch (InterruptedException e) { // handlePublicationError(new PublicationError() // .setMessage("Error while adding an asynchronous message") @@ -502,15 +502,15 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2, final Object message3) { if (message1 != null && message2 != null && message3 != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message1, message2, message3); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message1, message2, message3); +// } +// }; // try { - this.dispatchQueue.tryTransfer(runnable, timeout, unit); +// this.dispatchQueue.tryTransfer(runnable, timeout, unit); // } catch (InterruptedException e) { // handlePublicationError(new PublicationError() // .setMessage("Error while adding an asynchronous message") diff --git a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java index 7a46962..0d42829 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java @@ -17,10 +17,12 @@ import dorkbox.util.messagebus.annotations.Handler; public class ReflectionUtils { public static StrongConcurrentSetV8 getMethods(Class target) { - return getMethods(target, new StrongConcurrentSetV8(16, .8F, 1)); + StrongConcurrentSetV8 hashSet = new StrongConcurrentSetV8(16, .8F, 1); + getMethods(target, hashSet); + return hashSet; } - public static StrongConcurrentSetV8 getMethods(Class target, StrongConcurrentSetV8 methods) { + private static void getMethods(Class target, StrongConcurrentSetV8 methods) { try { for (Method method : target.getDeclaredMethods()) { if (getAnnotation(method, Handler.class) != null) { @@ -33,7 +35,6 @@ public class ReflectionUtils { if (!target.equals(Object.class)) { getMethods(target.getSuperclass(), methods); } - return methods; } /** diff --git a/src/main/java/com/lmax/disruptor/MessageType.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java similarity index 83% rename from src/main/java/com/lmax/disruptor/MessageType.java rename to src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java index 43c13ce..97f48ff 100644 --- a/src/main/java/com/lmax/disruptor/MessageType.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java @@ -1,4 +1,4 @@ -package com.lmax.disruptor; +package dorkbox.util.messagebus.common.simpleq; /** * @author dorkbox, llc * Date: 2/2/15 diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java deleted file mode 100644 index 48bb6c1..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java +++ /dev/null @@ -1,226 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import java.util.concurrent.ThreadLocalRandom; - -public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField { - - /** The number of CPUs */ - private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; - - /** - * The number of times to spin (with randomly interspersed calls - * to Thread.yield) on multiprocessor before blocking when a node - * is apparently the first waiter in the queue. See above for - * explanation. Must be a power of two. The value is empirically - * derived -- it works pretty well across a variety of processors, - * numbers of CPUs, and OSes. - */ - private static final int FRONT_SPINS = 1 << 7; - - /** - * The number of times to spin before blocking when a node is - * preceded by another node that is apparently spinning. Also - * serves as an increment to FRONT_SPINS on phase changes, and as - * base average frequency for yielding during spins. Must be a - * power of two. - */ - private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; - - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - /** Creates a {@code EliminationStack} that is initially empty. */ - public MpmcArrayTransferQueue(final int size) { - super(size); - } - - /** - * - * @param item - * @param timed - * @param nanos - * @return the offset that the item was placed into - */ - public void put(final Object item, final boolean timed, final long nanos) { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long capacity = mask + 1; - final long[] sBuffer = this.sequenceBuffer; - - long producerIndex; - long pSeqOffset; - long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - - while (true) { - producerIndex = lvProducerIndex(); // LoadLoad - - pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, item); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore - - return; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - producerIndex - capacity <= consumerIndex && // test against cached cIndex - producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - // return false; - } - - // another producer has moved the sequence by one, retry 2 - busySpin(); - } - } - - public Object take(final boolean timed, final long nanos) { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long cSeqOffset; - long producerIndex = -1; // start with bogus value, hope we don't need it - - while (true) { - consumerIndex = lvConsumerIndex(); // LoadLoad - cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElementNoCast(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore - - return e; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - consumerIndex >= producerIndex && // test against cached pIndex - consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - // return null; - busySpin(); // empty, so busy spin - } - - // another consumer beat us and moved sequence ahead, retry 2 - busySpin(); - } - } - - private static final void busySpin() { - ThreadLocalRandom randomYields = ThreadLocalRandom.current(); - - // busy spin for the amount of time (roughly) of a CPU context switch -// int spins = spinsFor(); - int spins = CHAINED_SPINS; - for (;;) { - if (spins > 0) { - if (randomYields.nextInt(CHAINED_SPINS) == 0) { -// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); - break; - } - --spins; - } else { - break; - } - } - } - - /** - * Returns spin/yield value for a node with given predecessor and - * data mode. See above for explanation. - */ - private final static int spinsFor() { -// if (MP && pred != null) { -// if (previousNodeType != currentNodeType) { -// // in the process of changing modes -// return FRONT_SPINS + CHAINED_SPINS; -// } -// if (pred.isMatched()) { - // at the front of the queue - return FRONT_SPINS; -// } -// if (pred.waiter == null) { -// // previous is spinning -// return CHAINED_SPINS; -// } -// } -// -// return 0; - } - - @Override - public boolean offer(Node message) { - return false; - } - - @Override - public Node poll() { - return null; - } - - @Override - public Node peek() { - return null; - } - -// public int peekLast() { -// long currConsumerIndex; -// long currProducerIndex; -// -// while (true) { -// currConsumerIndex = lvConsumerIndex(); -// currProducerIndex = lvProducerIndex(); -// -// if (currConsumerIndex == currProducerIndex) { -// return TYPE_EMPTY; -// } -// -// final Object lpElementNoCast = lpElementNoCast(calcElementOffset(currConsumerIndex)); -// if (lpElementNoCast == null) { -// continue; -// } -// -// return lpType(lpElementNoCast); -// } -// } - - @Override - public int size() { - return 0; - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() == lvProducerIndex(); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java new file mode 100644 index 0000000..543c855 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java @@ -0,0 +1,968 @@ +package dorkbox.util.messagebus.common.simpleq.jctools; + +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; +import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; + +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TransferQueue; + + +public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField implements TransferQueue { + private static final int TYPE_EMPTY = 0; + private static final int TYPE_CONSUMER = 1; + private static final int TYPE_PRODUCER = 2; + + /** Is it multi-processor? */ + private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; + + private static int INPROGRESS_SPINS = MP ? 32 : 0; + private static int PUSH_SPINS = MP ? 512 : 0; + private static int POP_SPINS = MP ? 512 : 0; + + + /** + * The number of times to spin before blocking in timed waits. + * The value is empirically derived -- it works well across a + * variety of processors and OSes. Empirically, the best value + * seems not to vary with number of CPUs (beyond 2) so is just + * a constant. + */ + private static int PARK_TIMED_SPINS = MP ? 32 : 0; + + /** + * The number of times to spin before blocking in untimed waits. + * This is greater than timed value because untimed waits spin + * faster since they don't need to check times on each spin. + */ + private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; + + /** + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices. + */ + private static final long SPIN_THRESHOLD = 1000L; + + private final int consumerCount; + + public MpmcTransferArrayQueue(final int consumerCount) { + this(consumerCount, (int) Math.pow(Runtime.getRuntime().availableProcessors(),2)); + } + + public MpmcTransferArrayQueue(final int consumerCount, final int queueSize) { + super(Pow2.roundToPowerOfTwo(queueSize)); + this.consumerCount = consumerCount; + } + + private final static ThreadLocal nodeThreadLocal = new ThreadLocal() { + @Override + protected Object initialValue() { + return new Node(); + } + }; + + + /** + * PRODUCER method + *

+ * Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary. + */ + @Override + public final void transfer(final Object item) { + producerWait(item, false, 0L); + } + + /** + * CONSUMER + */ + @Override + public final Object take() { + return consumerWait(false, 0L); + } + + @Override + public boolean offer(Object item) { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long capacity = mask + 1; + final long[] sBuffer = this.sequenceBuffer; + + long producerIndex; + long pSeqOffset; + long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it + + while (true) { + producerIndex = lvProducerIndex(); // LoadLoad + + pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, item); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + return true; + } + // failed cas, retry 1 + } else if (delta < 0 && // poll has not moved this value forward + producerIndex - capacity <= consumerIndex && // test against cached cIndex + producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + return false; + } + + // another producer has moved the sequence by one, retry 2 + busySpin(PUSH_SPINS); + } + } + + @Override + public boolean offer(Object item, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + long lastTime = System.nanoTime(); + + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long capacity = mask + 1; + final long[] sBuffer = this.sequenceBuffer; + + long producerIndex; + long pSeqOffset; + long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it + + while (true) { + producerIndex = lvProducerIndex(); // LoadLoad + + pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, item); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + return true; + } + // failed cas, retry 1 + } else if (delta < 0 && // poll has not moved this value forward + producerIndex - capacity <= consumerIndex && // test against cached cIndex + producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + + long now = System.nanoTime(); + long remaining = nanos -= now - lastTime; + lastTime = now; + + if (remaining > 0) { + if (remaining < SPIN_THRESHOLD) { + busySpin(PARK_UNTIMED_SPINS); + } else { + UNSAFE.park(false, 1L); + } + } else { + return false; + } + } + + // another producer has moved the sequence by one, retry 2 + busySpin(PUSH_SPINS); + } + } + + + @Override + public void put(Object item) throws InterruptedException { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long capacity = mask + 1; + final long[] sBuffer = this.sequenceBuffer; + + long producerIndex; + long pSeqOffset; + long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it + + while (true) { + producerIndex = lvProducerIndex(); // LoadLoad + + pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, item); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + return; + } + // failed cas, retry 1 + } else if (delta < 0 && // poll has not moved this value forward + producerIndex - capacity <= consumerIndex && // test against cached cIndex + producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + // return false; + busySpin(PUSH_SPINS); + } + + // another producer has moved the sequence by one, retry 2 + busySpin(PUSH_SPINS); + } + } + + + + + @Override + public Object poll() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long cSeqOffset; + long producerIndex = -1; // start with bogus value, hope we don't need it + + while (true) { + consumerIndex = lvConsumerIndex(); // LoadLoad + cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + return e; + } + // failed cas, retry 1 + } else if (delta < 0 && // slot has not been moved by producer + consumerIndex >= producerIndex && // test against cached pIndex + consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + return null; + } + + // another consumer beat us and moved sequence ahead, retry 2 + busySpin(POP_SPINS); + } + } + + @Override + public final boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. + return lvConsumerIndex() == lvProducerIndex(); + } + + @Override + public Object peek() { + long currConsumerIndex; + Object e; + do { + currConsumerIndex = lvConsumerIndex(); + // other consumers may have grabbed the element, or queue might be empty + e = lpElementNoCast(calcElementOffset(currConsumerIndex)); + // only return null if queue is empty + } while (e == null && currConsumerIndex != lvProducerIndex()); + return e; + } + + @Override + public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + @Override + public boolean tryTransfer(Object item) { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + previousElement = null; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + switch (lastType) { + case TYPE_EMPTY: + case TYPE_PRODUCER: { + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + return false; + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + continue; + } + case TYPE_CONSUMER: { + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + soItem1(e, item); + unpark(e); + + return true; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); + continue; + } + } + } + } + + @Override + public boolean tryTransfer(Object item, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + long lastTime = System.nanoTime(); + + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + previousElement = null; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + switch (lastType) { + case TYPE_EMPTY: + case TYPE_PRODUCER: { + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + long now = System.nanoTime(); + long remaining = nanos -= now - lastTime; + lastTime = now; + + if (remaining > 0) { + if (remaining < SPIN_THRESHOLD) { + busySpin(PARK_UNTIMED_SPINS); + } else { + UNSAFE.park(false, 1L); + } + // make sure to continue here (so we don't spin twice) + continue; + } else { + return false; + } + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + continue; + } + case TYPE_CONSUMER: { + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + soItem1(e, item); + unpark(e); + + return true; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); + continue; + } + } + } + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + long lastTime = System.nanoTime(); + + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long cSeqOffset; + long producerIndex = -1; // start with bogus value, hope we don't need it + + while (true) { + consumerIndex = lvConsumerIndex(); // LoadLoad + cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + return e; + } + // failed cas, retry 1 + } else if (delta < 0 && // slot has not been moved by producer + consumerIndex >= producerIndex && // test against cached pIndex + consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + + long now = System.nanoTime(); + long remaining = nanos -= now - lastTime; + lastTime = now; + + if (remaining > 0) { + if (remaining < SPIN_THRESHOLD) { + busySpin(PARK_UNTIMED_SPINS); + } else { + UNSAFE.park(false, 1L); + } + // make sure to continue here (so we don't spin twice) + continue; + } else { + return null; + } + } + + // another consumer beat us and moved sequence ahead, retry 2 + busySpin(POP_SPINS); + } + } + + @Override + public int remainingCapacity() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int drainTo(Collection c) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int drainTo(Collection c, int maxElements) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Object[] toArray(Object[] a) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean containsAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean addAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean removeAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean retainAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean hasWaitingConsumer() { + long consumerIndex; + long producerIndex; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + return false; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + return lpType(previousElement) == TYPE_CONSUMER; + } + } + } + + @Override + public int getWaitingConsumerCount() { + long consumerIndex; + long producerIndex; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + return 0; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + if (lpType(previousElement) == TYPE_CONSUMER) { + return (int) (producerIndex - consumerIndex); + } else { + return 0; + } + } + } + } + + + public final boolean hasPendingMessages() { + long consumerIndex; + long producerIndex; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + return true; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + return lpType(previousElement) != TYPE_CONSUMER || consumerIndex + this.consumerCount != producerIndex; + } + } + } + + private static final void busySpin(int spins) { + for (;;) { + if (spins > 0) { + --spins; + } else { + return; + } + } + } + + @SuppressWarnings("null") + private final void park(final Object node, final Thread myThread, final boolean timed, long nanos) { + long lastTime = timed ? System.nanoTime() : 0L; + int spins = -1; // initialized after first item and cancel checks + ThreadLocalRandom randomYields = null; // bound if needed + + for (;;) { + if (lvThread(node) == null) { + return; + } else if (myThread.isInterrupted() || timed && nanos <= 0) { + return; + } else if (spins < 0) { + if (timed) { + spins = PARK_TIMED_SPINS; + } else { + spins = PARK_UNTIMED_SPINS; + } + + if (spins > 0) { + randomYields = ThreadLocalRandom.current(); + } + } else if (spins > 0) { + if (randomYields.nextInt(256) == 0) { + Thread.yield(); // occasionally yield + } + --spins; + } else if (timed) { + long now = System.nanoTime(); + long remaining = nanos -= now - lastTime; + lastTime = now; + if (remaining > 0) { + if (remaining < SPIN_THRESHOLD) { + busySpin(PARK_UNTIMED_SPINS); + } else { + UNSAFE.park(false, nanos); + } + } else { + return; + } + } else { + // park can return for NO REASON (must check for thread values) + UNSAFE.park(false, 0L); + } + } + } + + private final void unpark(Object node) { + final Object thread = lpThread(node); + soThread(node, null); + UNSAFE.unpark(thread); + } + + private final void producerWait(final Object item, final boolean timed, final long nanos) { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + previousElement = null; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + switch (lastType) { + case TYPE_EMPTY: + case TYPE_PRODUCER: { + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_PRODUCER); + spThread(node, myThread); + spItem1(node, item); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + park(node, myThread, timed, nanos); + return; + } + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + continue; + } + case TYPE_CONSUMER: { + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + soItem1(e, item); + unpark(e); + + return; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); + continue; + } + } + } + } + + private final Object consumerWait(final boolean timed, final long nanos) { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + previousElement = null; + } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + switch (lastType) { + case TYPE_EMPTY: + case TYPE_CONSUMER: { + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_CONSUMER); + spThread(node, myThread); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + park(node, myThread, timed, nanos); + Object item1 = lvItem1(node); + + return item1; + } + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + continue; + } + case TYPE_PRODUCER: { + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + final Object lvItem1 = lpItem1(e); + unpark(e); + + return lvItem1; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); + continue; + } + } + } + } +} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java index f4f21ac..ca4c575 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java @@ -1,8 +1,7 @@ package dorkbox.util.messagebus.common.simpleq.jctools; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -import com.lmax.disruptor.MessageType; +import dorkbox.util.messagebus.common.simpleq.MessageType; abstract class ColdItems { public int type = 0; diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java deleted file mode 100644 index faaa1fc..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java +++ /dev/null @@ -1,525 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -import java.util.Collection; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TransferQueue; - - -public final class SimpleQueue extends MpmcArrayQueueConsumerField implements TransferQueue { - private static final int TYPE_EMPTY = 0; - private static final int TYPE_CONSUMER = 1; - private static final int TYPE_PRODUCER = 2; - - /** Is it multi-processor? */ - private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; - - private static int INPROGRESS_SPINS = MP ? 32 : 0; - private static int PUSH_SPINS = MP ? 512 : 0; - private static int POP_SPINS = MP ? 512 : 0; - - - /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - private static int PARK_TIMED_SPINS = MP ? 32 : 0; - - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - private static final long SPIN_THRESHOLD = 1000L; - - private final int consumerCount; - - public SimpleQueue(final int consumerCount) { - super(Pow2.roundToPowerOfTwo(consumerCount*Runtime.getRuntime().availableProcessors())); - this.consumerCount = consumerCount; - } - - private final static ThreadLocal nodeThreadLocal = new ThreadLocal() { - @Override - protected Object initialValue() { - return new Node(); - } - }; - - - /** - * PRODUCER method - *

- * Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary. - */ - @Override - public final void transfer(final Object item) { - - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long producerIndex; - int lastType; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - final Object previousElement; - if (consumerIndex == producerIndex) { - lastType = TYPE_EMPTY; - previousElement = null; - } else { - previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - lastType = lpType(previousElement); - } - - switch (lastType) { - case TYPE_EMPTY: - case TYPE_PRODUCER: { - // empty or same mode = push+park onto queue - long pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { - // Successful CAS: full barrier - - final Thread myThread = Thread.currentThread(); - final Object node = nodeThreadLocal.get(); - - spType(node, TYPE_PRODUCER); - spThread(node, myThread); - spItem1(node, item); - - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, node); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore - - park(node, myThread, false, 0); - return; - } - } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - continue; - } - case TYPE_CONSUMER: { - // complimentary mode = pop+unpark off queue - long cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElementNoCast(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore - - soItem1(e, item); - unpark(e); - - return; - } - } - - // whoops, inconsistent state - busySpin(POP_SPINS); - continue; - } - } - } - } - - /** - * CONSUMER - */ - @Override - public final Object take() { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long producerIndex; - int lastType; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - final Object previousElement; - if (consumerIndex == producerIndex) { - lastType = TYPE_EMPTY; - previousElement = null; - } else { - previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - lastType = lpType(previousElement); - } - - - switch (lastType) { - case TYPE_EMPTY: - case TYPE_CONSUMER: { - // empty or same mode = push+park onto queue - long pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { - // Successful CAS: full barrier - - final Thread myThread = Thread.currentThread(); - final Object node = nodeThreadLocal.get(); - - spType(node, TYPE_CONSUMER); - spThread(node, myThread); - - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, node); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore - - park(node, myThread, false, 0); - Object item1 = lvItem1(node); - - return item1; - } - } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - continue; - } - case TYPE_PRODUCER: { - // complimentary mode = pop+unpark off queue - long cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElementNoCast(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore - - final Object lvItem1 = lpItem1(e); - unpark(e); - - return lvItem1; - } - } - - // whoops, inconsistent state - busySpin(POP_SPINS); - continue; - } - } - } - } - - private static final void busySpin(int spins) { - for (;;) { - if (spins > 0) { - --spins; - } else { - return; - } - } - } - - @SuppressWarnings("null") - private final void park(final Object node, final Thread myThread, final boolean timed, long nanos) { - long lastTime = timed ? System.nanoTime() : 0L; - int spins = -1; // initialized after first item and cancel checks - ThreadLocalRandom randomYields = null; // bound if needed - - for (;;) { - if (lvThread(node) == null) { - return; - } else if (myThread.isInterrupted() || timed && nanos <= 0) { - return; - } else if (spins < 0) { - if (timed) { - spins = PARK_TIMED_SPINS; - } else { - spins = PARK_UNTIMED_SPINS; - } - - if (spins > 0) { - randomYields = ThreadLocalRandom.current(); - } - } else if (spins > 0) { - if (randomYields.nextInt(256) == 0) { - Thread.yield(); // occasionally yield - } - --spins; - } else if (timed) { - long now = System.nanoTime(); - long remaining = nanos -= now - lastTime; - if (remaining > 0) { - if (remaining < SPIN_THRESHOLD) { - busySpin(PARK_UNTIMED_SPINS); - } else { - UNSAFE.park(false, nanos); - } - } - lastTime = now; - } else { - // park can return for NO REASON (must check for thread values) - UNSAFE.park(false, 0L); - } - } - } - - private final void unpark(Object node) { - final Object thread = lpThread(node); - soThread(node, null); - UNSAFE.unpark(thread); - } - - @Override - public final boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() == lvProducerIndex(); - } - - public final boolean hasPendingMessages() { - // count the number of consumers waiting, it should be the same as the number of threads configured - long consumerIndex = lvConsumerIndex(); - long producerIndex = lvProducerIndex(); - - if (consumerIndex != producerIndex) { - final Object previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); - if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.consumerCount == producerIndex) { - return false; - } - } - - return true; - } - - - - - - - - - public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) { - } - - - @Override - public boolean offer(Object message) { - // TODO Auto-generated method stub - return false; - } - - @Override - public Object poll() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Object peek() { - long currConsumerIndex; - Object e; - do { - currConsumerIndex = lvConsumerIndex(); - // other consumers may have grabbed the element, or queue might be empty - e = lpElementNoCast(calcElementOffset(currConsumerIndex)); - // only return null if queue is empty - } while (e == null && currConsumerIndex != lvProducerIndex()); - return e; - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - - - - - - - - - - - - @Override - public void put(Object e) throws InterruptedException { - // TODO Auto-generated method stub - - } - - @Override - public boolean offer(Object e, long timeout, TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return false; - } - - @Override - public Object poll(long timeout, TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return null; - } - - @Override - public int remainingCapacity() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int drainTo(Collection c) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int drainTo(Collection c, int maxElements) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public Object[] toArray(Object[] a) { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean containsAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean addAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean removeAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean retainAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean tryTransfer(Object e) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean tryTransfer(Object e, long timeout, TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean hasWaitingConsumer() { - // TODO Auto-generated method stub - return false; - } - - @Override - public int getWaitingConsumerCount() { - // TODO Auto-generated method stub - return 0; - } -} diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java similarity index 76% rename from src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java rename to src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java index 5c4439c..047ee94 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java @@ -1,41 +1,39 @@ /* * Copyright 2012 Real Logic Ltd. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package dorkbox.util.messagebus; +import java.util.concurrent.LinkedTransferQueue; + import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Node; -public class MpmcQueueAltConcurrentPerfTest { +public class LinkTransferQueueConcurrentNonBlockPerfTest { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 2; + private static final int concurrency = 4; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final MpmcArrayTransferQueue queue = new MpmcArrayTransferQueue(1 << 17); + final LinkedTransferQueue queue = new LinkedTransferQueue(); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -50,7 +48,7 @@ public class MpmcQueueAltConcurrentPerfTest { System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); } - private static long performanceRun(int runNumber, MpmcArrayTransferQueue queue) throws Exception { + private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { Producer[] producers = new Producer[concurrency]; Consumer[] consumers = new Consumer[concurrency]; @@ -99,42 +97,44 @@ public class MpmcQueueAltConcurrentPerfTest { } public static class Producer implements Runnable { - private final MpmcArrayTransferQueue queue; + private final LinkedTransferQueue queue; volatile long start; - public Producer(MpmcArrayTransferQueue queue) { + public Producer(LinkedTransferQueue queue) { this.queue = queue; } @Override public void run() { - MpmcArrayTransferQueue producer = this.queue; + LinkedTransferQueue producer = this.queue; int i = REPETITIONS; this.start = System.nanoTime(); do { - producer.put(TEST_VALUE, false, 0); + producer.put(TEST_VALUE); } while (0 != --i); } } public static class Consumer implements Runnable { - private final MpmcArrayTransferQueue queue; + private final LinkedTransferQueue queue; Object result; volatile long end; - public Consumer(MpmcArrayTransferQueue queue) { + public Consumer(LinkedTransferQueue queue) { this.queue = queue; } @Override public void run() { - MpmcArrayTransferQueue consumer = this.queue; + LinkedTransferQueue consumer = this.queue; Object result = null; int i = REPETITIONS; do { - result = consumer.take(false, 0); + while((result = consumer.poll()) == null) { + Thread.yield(); + } } while (0 != --i); this.result = result; diff --git a/src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java new file mode 100644 index 0000000..1b55482 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java @@ -0,0 +1,167 @@ +package dorkbox.util.messagebus; + +import org.openjdk.jol.info.ClassLayout; +import org.openjdk.jol.util.VMSupport; + +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; + +public class MpmcNonBlockPerfTest { + public static final int REPETITIONS = 50 * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << 17; + + private static final int concurrency = 4; + + public static void main(final String[] args) throws Exception { + System.out.println(VMSupport.vmDetails()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); + + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + + final int warmupRuns = 2; + final int runs = 5; + + long average = 0; + + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); + average = averageRun(warmupRuns, runs, queue); + +// SimpleQueue.INPROGRESS_SPINS = 64; +// SimpleQueue.POP_SPINS = 512; +// SimpleQueue.PUSH_SPINS = 512; +// SimpleQueue.PARK_SPINS = 512; +// +// for (int i = 128; i< 10000;i++) { +// int full = 2*i; +// +// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); +// SimpleQueue.PARK_SPINS = full; +// +// +// long newAverage = averageRun(warmupRuns, runs, queue); +// if (newAverage > average) { +// average = newAverage; +// System.err.println("Best value: " + i + " : " + newAverage); +// } +// } + + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); + } + + private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1_000_000_000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final MpmcTransferArrayQueue queue; + volatile long start; + + public Producer(MpmcTransferArrayQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcTransferArrayQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + try { + do { + producer.put(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final MpmcTransferArrayQueue queue; + Object result; + volatile long end; + + public Consumer(MpmcTransferArrayQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcTransferArrayQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + do { + while((result = consumer.poll()) == null) { + Thread.yield(); + } + } while (0 != --i); + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java deleted file mode 100644 index b30fbf7..0000000 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus; - -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; - -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.Node; - -public class MpmcQueueAltPerfTest { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - - public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final MpmcArrayTransferQueue queue = new MpmcArrayTransferQueue(QUEUE_CAPACITY); - - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); - } - - private static long performanceRun(int runNumber, MpmcArrayTransferQueue queue) throws Exception { - Producer p = new Producer(queue); - Thread thread = new Thread(p); - thread.start(); // producer will timestamp start - - MpmcArrayTransferQueue consumer = queue; - Object result; - int i = REPETITIONS; - do { - result = consumer.take(false, 0); - } while (0 != --i); - long end = System.nanoTime(); - - thread.join(); - long duration = end - p.start; - long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; - String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, ((Node)result).item1); - return ops; - } - - public static class Producer implements Runnable { - private final MpmcArrayTransferQueue queue; - long start; - - public Producer(MpmcArrayTransferQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - MpmcArrayTransferQueue producer = this.queue; - int i = REPETITIONS; - long s = System.nanoTime(); - - MpmcArrayQueue pool = new MpmcArrayQueue(2); - pool.offer(new Node()); - pool.offer(new Node()); - - Node node; - do { - node = pool.poll(); - node.item1 = TEST_VALUE; - - producer.put(node, false, 0); - - pool.offer(node); - } while (0 != --i); - this.start = s; - } - } -} diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java similarity index 86% rename from src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java rename to src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java index b26c6c0..d79537b 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java @@ -3,10 +3,10 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Node; -import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; -public class SimpleQueueAltPerfTest { +public class MpmcTransferPerfTest { public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); @@ -25,7 +25,7 @@ public class SimpleQueueAltPerfTest { long average = 0; - final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); average = averageRun(warmupRuns, runs, queue); // SimpleQueue.INPROGRESS_SPINS = 64; @@ -51,7 +51,7 @@ public class SimpleQueueAltPerfTest { System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); } - private static long averageRun(int warmUpRuns, int sumCount, SimpleQueue queue) throws Exception { + private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception { int runs = warmUpRuns + sumCount; final long[] results = new long[runs]; for (int i = 0; i < runs; i++) { @@ -67,7 +67,7 @@ public class SimpleQueueAltPerfTest { return sum/sumCount; } - private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception { + private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { Producer[] producers = new Producer[concurrency]; Consumer[] consumers = new Consumer[concurrency]; @@ -116,16 +116,16 @@ public class SimpleQueueAltPerfTest { } public static class Producer implements Runnable { - private final SimpleQueue queue; + private final MpmcTransferArrayQueue queue; volatile long start; - public Producer(SimpleQueue queue) { + public Producer(MpmcTransferArrayQueue queue) { this.queue = queue; } @Override public void run() { - SimpleQueue producer = this.queue; + MpmcTransferArrayQueue producer = this.queue; int i = REPETITIONS; this.start = System.nanoTime(); @@ -136,17 +136,17 @@ public class SimpleQueueAltPerfTest { } public static class Consumer implements Runnable { - private final SimpleQueue queue; + private final MpmcTransferArrayQueue queue; Object result; volatile long end; - public Consumer(SimpleQueue queue) { + public Consumer(MpmcTransferArrayQueue queue) { this.queue = queue; } @Override public void run() { - SimpleQueue consumer = this.queue; + MpmcTransferArrayQueue consumer = this.queue; Object result = null; int i = REPETITIONS;