diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 0f5156f..b9eec9b 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -5,12 +5,13 @@ import java.util.ArrayDeque; import java.util.Collection; import java.util.concurrent.TimeUnit; +import org.jctools.util.Pow2; + import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java deleted file mode 100644 index 4ea0806..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.jctools; - - -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -import java.util.AbstractQueue; -import java.util.Iterator; - -abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue implements MessagePassingQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - -/** - * A concurrent access enabling class used by circular array based queues this class exposes an offset computation - * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and - * the array is padded on either side to help with False sharing prevention. It is expected that subclasses handle post - * padding. - *

- * Offset calculation is separate from access to enable the reuse of a give compute offset. - *

- * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a - * LoadLoad barrier. - *

- * - * @author nitsanw - * - * @param - */ -public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); - protected static final int BUFFER_PAD; - private static final long REF_ARRAY_BASE; - private static final int REF_ELEMENT_SHIFT; - - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - - BUFFER_PAD = 128 / scale; - // Including the buffer pad in the array base offset - REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) - + (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT); - } - protected final long mask; - // @Stable :( - protected final E[] buffer; - - @SuppressWarnings("unchecked") - public ConcurrentCircularArrayQueue(int capacity) { - int actualCapacity = Pow2.roundToPowerOfTwo(capacity); - this.mask = actualCapacity - 1; - // pad data on either end with some empty slots. - this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - } - - /** - * @param index desirable element index - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index) { - return calcElementOffset(index, this.mask); - } - /** - * @param index desirable element index - * @param mask - * @return the offset in bytes within the array for a given index. - */ - protected static final long calcElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } - - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e a kitty - */ - protected final void spElement(long offset, Object e) { - UNSAFE.putObject(this.buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(long offset, E e) { - UNSAFE.putOrderedObject(this.buffer, offset, e); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lpElement(long offset) { - return (E) UNSAFE.getObject(this.buffer, offset); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final Object lpElementNoCast(long offset) { - return UNSAFE.getObject(this.buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lvElement(long offset) { - return (E) UNSAFE.getObjectVolatile(this.buffer, offset); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override - public void clear() { - // we have to test isEmpty because of the weaker poll() guarantee - while (poll() != null || !isEmpty()) { - ; - } - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentSequencedCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentSequencedCircularArrayQueue.java deleted file mode 100644 index 3dbceea..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentSequencedCircularArrayQueue.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -public abstract class ConcurrentSequencedCircularArrayQueue extends ConcurrentCircularArrayQueue { - private static final long ARRAY_BASE; - private static final int ELEMENT_SHIFT; - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class); - if (8 == scale) { - ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unexpected long[] element size"); - } - // Including the buffer pad in the array base offset - ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << ELEMENT_SHIFT - SPARSE_SHIFT); - } - protected final long[] sequenceBuffer; - - public ConcurrentSequencedCircularArrayQueue(int capacity) { - super(capacity); - int actualCapacity = (int) (this.mask + 1); - // pad data on either end with some empty slots. - this.sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - for (long i = 0; i < actualCapacity; i++) { - soSequence(this.sequenceBuffer, calcSequenceOffset(i), i); - } - } - - protected final long calcSequenceOffset(long index) { - return calcSequenceOffset(index, this.mask); - } - protected static final long calcSequenceOffset(long index, long mask) { - return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT); - } - protected final void soSequence(long[] buffer, long offset, long e) { - UNSAFE.putOrderedLong(buffer, offset, e); - } - - protected final long lvSequence(long[] buffer, long offset) { - return UNSAFE.getLongVolatile(buffer, offset); - } - -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MessagePassingQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MessagePassingQueue.java deleted file mode 100644 index 4e3f140..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MessagePassingQueue.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import java.util.Queue; - -/** - * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface - * sufficient for concurrent message passing.
- * Message passing queues offer happens before semantics to messages passed through, namely that writes made by the - * producer before offering the message are visible to the consuming thread after the message has been polled out of the - * queue. - * - * @author nitsanw - * - * @param the event/message type - */ -interface MessagePassingQueue { - - /** - * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the - * {@link Queue#offer(Object)} interface. - * - * @param message - * @return true if element was inserted into the queue, false iff full - */ - boolean offer(M message); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#poll()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M poll(); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#peek()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M peek(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a - * best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1). - * - * @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded - */ - int size(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the observation is carried out. - * - * @return true if empty, false otherwise - */ - boolean isEmpty(); - -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java deleted file mode 100755 index c192b6e..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.jctools; - - - -/** - * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that - * any and all threads may call the offer/poll/peek methods and correctness is maintained.
- * This implementation follows patterns documented on the package level for False Sharing protection.
- * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original - * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in - * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each - * field of the struct. There is a further alternative in the experimental project which uses iteration phase - * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as - * well as this implementation.
- * Tradeoffs to keep in mind: - *

    - *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of - * both arrays. We are trading memory to avoid false sharing(active and passive). - *
  2. 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the - * elements array. This is doubling/tripling the memory allocated for the buffer. - *
  3. Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or - * equal to the requested capacity. - *
- */ -public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { - /** The number of CPUs */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - - /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. - */ - private static final int SPINS = NCPU == 1 ? 0 : 600; - - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueue(final int capacity) { - super(validateCapacity(capacity)); - } - - private static int validateCapacity(int capacity) { - if(capacity < 2) { - throw new IllegalArgumentException("Minimum size is 2"); - } - return capacity; - } - - @Override - public boolean offer(final T e) { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long capacity = mask + 1; - final long[] sBuffer = this.sequenceBuffer; - - long producerIndex; - long pSeqOffset; - long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - - while (true) { - producerIndex = lvProducerIndex(); // LoadLoad - pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, e); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore - - return true; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - producerIndex - capacity <= consumerIndex && // test against cached cIndex - producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - return false; - } - - // another producer has moved the sequence by one, retry 2 - busySpin(); - } - } - - /** - * {@inheritDoc} - *

- * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll - * and must test producer index when next element is not visible. - */ - @Override - public T poll() { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long cSeqOffset; - long producerIndex = -1; // start with bogus value, hope we don't need it - - while (true) { - consumerIndex = lvConsumerIndex(); // LoadLoad - cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final T e = lpElement(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore - - return e; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - consumerIndex >= producerIndex && // test against cached pIndex - consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; - } - - // another consumer beat us and moved sequence ahead, retry 2 - // only producer will busy spin - } - } - - @Override - public T peek() { - long currConsumerIndex; - T e; - do { - currConsumerIndex = lvConsumerIndex(); - // other consumers may have grabbed the element, or queue might be empty - e = lpElement(calcElementOffset(currConsumerIndex)); - // only return null if queue is empty - } while (e == null && currConsumerIndex != lvProducerIndex()); - return e; - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() == lvProducerIndex(); - } - - private static final void busySpin() { - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; - for (;;) { - if (spins > 0) { - --spins; - } else { - break; - } - } - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java deleted file mode 100644 index 8b3568b..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java +++ /dev/null @@ -1,31 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -public abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad { - private final static long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long consumerIndex; - - public MpmcArrayQueueConsumerField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndex() { - return this.consumerIndex; - } - - protected final long lpConsumerIndex() { - return UNSAFE.getLong(this, C_INDEX_OFFSET); - } - - protected final boolean casConsumerIndex(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL1Pad.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL1Pad.java deleted file mode 100644 index 06f2318..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL1Pad.java +++ /dev/null @@ -1,10 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -abstract class MpmcArrayQueueL1Pad extends ConcurrentSequencedCircularArrayQueue { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL1Pad(int capacity) { - super(capacity); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL2Pad.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL2Pad.java deleted file mode 100644 index 78137fe..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueL2Pad.java +++ /dev/null @@ -1,10 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -abstract class MpmcArrayQueueL2Pad extends MpmcArrayQueueProducerField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL2Pad(int capacity) { - super(capacity); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueProducerField.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueProducerField.java deleted file mode 100644 index 84d6780..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueProducerField.java +++ /dev/null @@ -1,28 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; - -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -abstract class MpmcArrayQueueProducerField extends MpmcArrayQueueL1Pad { - private final static long P_INDEX_OFFSET; - static { - try { - P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class - .getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long producerIndex; - - public MpmcArrayQueueProducerField(int capacity) { - super(capacity); - } - - protected final long lvProducerIndex() { - return this.producerIndex; - } - - protected final boolean casProducerIndex(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } -} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java index 543c855..3b57f7f 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java @@ -10,15 +10,18 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; import java.util.Collection; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.util.Pow2; +import org.jctools.util.UnsafeAccess; -public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField implements TransferQueue { + +public final class MpmcTransferArrayQueue extends MpmcArrayQueue implements TransferQueue { private static final int TYPE_EMPTY = 0; private static final int TYPE_CONSUMER = 1; private static final int TYPE_PRODUCER = 2; @@ -79,19 +82,127 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField + * Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will + * as long as necessary */ @Override public final Object take() { - return consumerWait(false, 0L); + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + final Object previousElement; + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + previousElement = null; + } else { + previousElement = lpElement(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + switch (lastType) { + case TYPE_EMPTY: + case TYPE_CONSUMER: { + +// if (lastType != TYPE_PRODUCER) { + // TYPE_EMPTY, TYPE_CONSUMER + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_CONSUMER); + spThread(node, myThread); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + park(node, myThread, false, 0L); + Object item1 = lvItem1(node); + + return item1; + } + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + continue; + } +// else { + case TYPE_PRODUCER: { + // TYPE_PRODUCER + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElement(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + final Object lvItem1 = lpItem1(e); + unpark(e); + + return lvItem1; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); + continue; + } + } + } } + + // modification of super implementation, as to include a small busySpin on contention @Override - public boolean offer(Object item) { + public final boolean offer(Object item) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; @@ -137,6 +248,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField 0) { - if (remaining < SPIN_THRESHOLD) { - busySpin(PARK_UNTIMED_SPINS); - } else { - UNSAFE.park(false, 1L); - } - // make sure to continue here (so we don't spin twice) - continue; - } else { - return false; - } - } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - continue; - } - case TYPE_CONSUMER: { - // complimentary mode = pop+unpark off queue - long cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElementNoCast(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore - - soItem1(e, item); - unpark(e); - - return true; - } - } - - // whoops, inconsistent state - busySpin(POP_SPINS); - continue; - } - } - } + return producerXfer(item, true, nanos); } @Override @@ -537,7 +558,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField 0) { if (remaining < SPIN_THRESHOLD) { + // a park is too slow for this number, so just busy spin instead busySpin(PARK_UNTIMED_SPINS); } else { - UNSAFE.park(false, nanos); + UnsafeAccess.UNSAFE.park(false, nanos); } } else { - return; + return false; } } else { // park can return for NO REASON (must check for thread values) - UNSAFE.park(false, 0L); + UnsafeAccess.UNSAFE.park(false, 0L); } } } @@ -760,10 +786,14 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField - *
    - *
  1. To construct class fields which allow volatile/ordered/plain access: This requirement is covered by - * {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach - * (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers. - *
  2. To construct flavors of {@link AtomicReferenceArray}. - *
  3. Other use cases exist but are not present in this library yet. - *
- * - * @author nitsanw - * - */ -public class UnsafeAccess { - public static final boolean SUPPORTS_GET_AND_SET; - public static final Unsafe UNSAFE; - static { - try { - final Field field = Unsafe.class.getDeclaredField("theUnsafe"); - field.setAccessible(true); - UNSAFE = (Unsafe) field.get(null); - } catch (Exception e) { - SUPPORTS_GET_AND_SET = false; - throw new RuntimeException(e); - } - boolean getAndSetSupport = false; - try { - Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE,Object.class); - getAndSetSupport = true; - } catch (Exception e) { - } - SUPPORTS_GET_AND_SET = getAndSetSupport; - } - -} diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java deleted file mode 100644 index c6d3c46..0000000 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus; - -import java.util.concurrent.LinkedTransferQueue; - -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; - -import dorkbox.util.messagebus.common.simpleq.jctools.Node; - -public class LinkTransferQueuePerfTest { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - - public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - - - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - - final LinkedTransferQueue queue = new LinkedTransferQueue(); - - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); - } - - - private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { - Producer p = new Producer(queue); - Thread thread = new Thread(p); - thread.start(); // producer will timestamp start - - LinkedTransferQueue consumer = queue; - Object result; - int i = REPETITIONS; - int queueEmpty = 0; - do { - result = consumer.take(); - } while (0 != --i); - long end = System.nanoTime(); - - thread.join(); - long duration = end - p.start; - long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; - String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, - qName, result, queueEmpty, p.queueFull); - return ops; - } - - public static class Producer implements Runnable { - private final LinkedTransferQueue queue; - int queueFull = 0; - long start; - - public Producer(LinkedTransferQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - LinkedTransferQueue producer = this.queue; - int i = REPETITIONS; - int f = 0; - long s = System.nanoTime(); - - try { - do { - producer.transfer(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - // log.error(e); - } - this.queueFull = f; - this.start = s; - } - } -} diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java new file mode 100644 index 0000000..90f813d --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java @@ -0,0 +1,149 @@ +/* + * Copyright 2012 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus; + +import java.util.concurrent.LinkedBlockingQueue; + +public class PerfTest_LinkedBlockingQueue { + // 15 == 32 * 1024 + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + + private static final int concurrency = 2; + + public static void main(final String[] args) throws Exception { + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17); + + final long[] results = new long[20]; + for (int i = 0; i < 20; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last 10 results for summary + long sum = 0; + for (int i = 10; i < 20; i++) { + sum += results[i]; + } + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + } + + private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final LinkedBlockingQueue queue; + volatile long start; + + public Producer(LinkedBlockingQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedBlockingQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + try { + do { + producer.put(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + } + } + + public static class Consumer implements Runnable { + private final LinkedBlockingQueue queue; + Object result; + volatile long end; + + public Consumer(LinkedBlockingQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedBlockingQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java new file mode 100644 index 0000000..6ac1775 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java @@ -0,0 +1,141 @@ +/* + * Copyright 2012 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus; + +import java.util.concurrent.LinkedBlockingQueue; + +public class PerfTest_LinkedBlockingQueue_NonBlock { + // 15 == 32 * 1024 + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + + private static final int concurrency = 2; + + public static void main(final String[] args) throws Exception { + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17); + + final long[] results = new long[20]; + for (int i = 0; i < 20; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last 10 results for summary + long sum = 0; + for (int i = 10; i < 20; i++) { + sum += results[i]; + } + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + } + + private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final LinkedBlockingQueue queue; + volatile long start; + + public Producer(LinkedBlockingQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedBlockingQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + do { + while (!producer.offer(TEST_VALUE)) { + Thread.yield(); + } + } while (0 != --i); + } + } + + public static class Consumer implements Runnable { + private final LinkedBlockingQueue queue; + Object result; + volatile long end; + + public Consumer(LinkedBlockingQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedBlockingQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + do { + while (null == (result = consumer.poll())) { + Thread.yield(); + } + } while (0 != --i); + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue.java similarity index 92% rename from src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue.java index ffccddb..bf9465e 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue.java @@ -14,24 +14,16 @@ package dorkbox.util.messagebus; import java.util.concurrent.LinkedTransferQueue; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; - -import dorkbox.util.messagebus.common.simpleq.jctools.Node; - -public class LinkTransferQueueConcurrentPerfTest { +public class PerfTest_LinkedTransferQueue { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 2; + private static final int concurrency = 1; public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); final LinkedTransferQueue queue = new LinkedTransferQueue(); diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java similarity index 92% rename from src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java index 047ee94..4e46ffd 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentNonBlockPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java @@ -14,12 +14,7 @@ package dorkbox.util.messagebus; import java.util.concurrent.LinkedTransferQueue; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; - -import dorkbox.util.messagebus.common.simpleq.jctools.Node; - -public class LinkTransferQueueConcurrentNonBlockPerfTest { +public class PerfTest_LinkedTransferQueue_NonBlock { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); @@ -29,9 +24,6 @@ public class LinkTransferQueueConcurrentNonBlockPerfTest { private static final int concurrency = 4; public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); final LinkedTransferQueue queue = new LinkedTransferQueue(); diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline.java similarity index 82% rename from src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline.java index aebd22a..59b2f2d 100755 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline.java @@ -15,12 +15,14 @@ */ package dorkbox.util.messagebus; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; +import org.jctools.queues.MpmcArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; -public class MpmcQueueBaselinePerfTest { +public class PerfTest_MpmcArrayQueue_Baseline { + static { + System.setProperty("sparse.shift", "2"); + } + // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final Integer TEST_VALUE = Integer.valueOf(777); @@ -28,11 +30,8 @@ public class MpmcQueueBaselinePerfTest { public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Integer.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); + final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -47,13 +46,13 @@ public class MpmcQueueBaselinePerfTest { System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); } - private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { + private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { Producer p = new Producer(queue); Thread thread = new Thread(p); thread.start(); // producer will timestamp start - MpmcArrayQueue consumer = queue; - Object result; + MpmcArrayQueue consumer = queue; + Integer result; int i = REPETITIONS; int queueEmpty = 0; do { @@ -74,17 +73,17 @@ public class MpmcQueueBaselinePerfTest { } public static class Producer implements Runnable { - private final MpmcArrayQueue queue; + private final MpmcArrayQueue queue; int queueFull = 0; long start; - public Producer(MpmcArrayQueue queue) { + public Producer(MpmcArrayQueue queue) { this.queue = queue; } @Override public void run() { - MpmcArrayQueue producer = this.queue; + MpmcArrayQueue producer = this.queue; int i = REPETITIONS; int f = 0; long s = System.nanoTime(); diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline_Node.java similarity index 91% rename from src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline_Node.java index c894fa8..381f37f 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Baseline_Node.java @@ -15,13 +15,11 @@ */ package dorkbox.util.messagebus; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; +import org.jctools.queues.MpmcArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Node; -public class MpmcQueueBaselineNodePerfTest { +public class PerfTest_MpmcArrayQueue_Baseline_Node { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final Integer TEST_VALUE = Integer.valueOf(777); @@ -29,9 +27,6 @@ public class MpmcQueueBaselineNodePerfTest { public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Concurrent.java similarity index 88% rename from src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Concurrent.java index b98f0f3..0af0c3a 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcArrayQueue_Concurrent.java @@ -15,25 +15,18 @@ */ package dorkbox.util.messagebus; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; +import org.jctools.queues.MpmcArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.Node; - -public class MpmcQueueConcurrentPerfTest { +public class PerfTest_MpmcArrayQueue_Concurrent { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 2; + private static final int concurrency = 1; public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); final MpmcArrayQueue queue = new MpmcArrayQueue(1 << 17); @@ -113,7 +106,9 @@ public class MpmcQueueConcurrentPerfTest { this.start = System.nanoTime(); do { - producer.offer(TEST_VALUE); + while (!producer.offer(TEST_VALUE)) { + Thread.yield(); + } } while (0 != --i); } } @@ -134,7 +129,9 @@ public class MpmcQueueConcurrentPerfTest { int i = REPETITIONS; do { - result = consumer.poll(); + while (null == (result = consumer.poll())) { + Thread.yield(); + } } while (0 != --i); this.result = result; diff --git a/src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java similarity index 98% rename from src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java index d79537b..b4b7f9b 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcTransferPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java @@ -6,15 +6,16 @@ import org.openjdk.jol.util.VMSupport; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Node; -public class MpmcTransferPerfTest { +public class PerfTest_MpmcTransferArrayQueue { public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 2; + private static final int concurrency = 4; public static void main(final String[] args) throws Exception { + System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); diff --git a/src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java similarity index 83% rename from src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java rename to src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java index 1b55482..46f33b7 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcNonBlockPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java @@ -1,25 +1,16 @@ package dorkbox.util.messagebus; -import org.openjdk.jol.info.ClassLayout; -import org.openjdk.jol.util.VMSupport; - import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; -import dorkbox.util.messagebus.common.simpleq.jctools.Node; -public class MpmcNonBlockPerfTest { - public static final int REPETITIONS = 50 * 1000 * 100; +public class PerfTest_MpmcTransferArrayQueue_NonBlock { + public static final int REPETITIONS = 50 * 1000 * 1000; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 4; + private static final int concurrency = 1; public static void main(final String[] args) throws Exception { - System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final int warmupRuns = 2; final int runs = 5; @@ -129,13 +120,11 @@ public class MpmcNonBlockPerfTest { int i = REPETITIONS; this.start = System.nanoTime(); - try { - do { - producer.put(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } + do { + while (!producer.offer(TEST_VALUE)) { + Thread.yield(); + } + } while (0 != --i); } } @@ -155,8 +144,8 @@ public class MpmcNonBlockPerfTest { int i = REPETITIONS; do { - while((result = consumer.poll()) == null) { - Thread.yield(); + while (null == (result = consumer.poll())) { + Thread.yield(); } } while (0 != --i);