From 7fa59650a854b568ed062d8830f4d5872ae96ce6 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 8 May 2015 17:32:19 +0200 Subject: [PATCH] WIP, code polish. discovered rare test hang --- .../jctools/MpmcTransferArrayQueue.java | 280 ++++++++++++------ .../common/simpleq/jctools/Node.java | 10 +- .../PerfTest_MpmcTransferArrayQueue.java | 16 +- ...fTest_MpmcTransferArrayQueue_NonBlock.java | 4 +- 4 files changed, 214 insertions(+), 96 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java index 3b57f7f..c9721f0 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java @@ -17,7 +17,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; import org.jctools.queues.MpmcArrayQueue; -import org.jctools.util.Pow2; import org.jctools.util.UnsafeAccess; @@ -29,9 +28,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme /** Is it multi-processor? */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; - private static int INPROGRESS_SPINS = MP ? 32 : 0; - private static int PUSH_SPINS = MP ? 512 : 0; - private static int POP_SPINS = MP ? 512 : 0; + private static final int INPROGRESS_SPINS = MP ? 32 : 0; + private static final int PUSH_SPINS = MP ? 32 : 0; + private static final int POP_SPINS = MP ? 512 : 0; /** @@ -41,14 +40,14 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - private static int PARK_TIMED_SPINS = MP ? 32 : 0; + private static final int PARK_TIMED_SPINS = MP ? 32 : 0; /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. */ - private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; + private static final int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; /** * The number of nanoseconds for which it is faster to spin @@ -59,11 +58,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme private final int consumerCount; public MpmcTransferArrayQueue(final int consumerCount) { - this(consumerCount, (int) Math.pow(Runtime.getRuntime().availableProcessors(),2)); - } - - public MpmcTransferArrayQueue(final int consumerCount, final int queueSize) { - super(Pow2.roundToPowerOfTwo(queueSize)); + super(1024); // must be power of 2 this.consumerCount = consumerCount; } @@ -95,6 +90,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme public final Object take() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long consumerIndex; @@ -105,12 +101,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; - previousElement = null; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -120,11 +114,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme lastType = lpType(previousElement); } - switch (lastType) { - case TYPE_EMPTY: - case TYPE_CONSUMER: { - -// if (lastType != TYPE_PRODUCER) { + if (lastType != TYPE_PRODUCER) { // TYPE_EMPTY, TYPE_CONSUMER // empty or same mode = push+park onto queue long pSeqOffset = calcSequenceOffset(producerIndex, mask); @@ -133,7 +123,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (delta == 0) { // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { // Successful CAS: full barrier final Thread myThread = Thread.currentThread(); @@ -145,12 +136,12 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, node); + spElement(buffer, offset, node); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore park(node, myThread, false, 0L); Object item1 = lvItem1(node); @@ -161,28 +152,26 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // whoops, inconsistent state busySpin(PUSH_SPINS); - continue; - } -// else { - case TYPE_PRODUCER: { + } else { // TYPE_PRODUCER // complimentary mode = pop+unpark off queue long cSeqOffset = calcSequenceOffset(consumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(offset); - spElement(offset, null); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore final Object lvItem1 = lpItem1(e); unpark(e); @@ -193,8 +182,111 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // whoops, inconsistent state busySpin(POP_SPINS); - continue; } + } + } + + /** + * CONSUMER + *

+ * Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will + * as long as necessary + */ + public final Object take(final Node node) { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + } else { + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + if (lastType != TYPE_PRODUCER) { + // TYPE_EMPTY, TYPE_CONSUMER + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); +// final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_CONSUMER); + spThread(node, myThread); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(buffer, offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore + + park(node, myThread, false, 0L); + Object item1 = lvItem1(node); + + return item1; + } + } + + // whoops, inconsistent state + busySpin(PUSH_SPINS); + } else { + // TYPE_PRODUCER + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore + + final Object lvItem1 = lpItem1(e); + unpark(e); + + return lvItem1; + } + } + + // whoops, inconsistent state + busySpin(POP_SPINS); } } } @@ -206,6 +298,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long producerIndex; @@ -221,17 +314,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (delta == 0) { // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, item); + spElement(buffer, offset, item); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore return true; } @@ -259,6 +353,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long producerIndex; @@ -274,17 +369,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (delta == 0) { // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, item); + spElement(buffer, offset, item); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore return true; } @@ -320,6 +416,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long producerIndex; @@ -335,17 +432,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (delta == 0) { // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, item); + spElement(buffer, offset, item); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore return; } @@ -370,6 +468,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme public Object poll() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long consumerIndex; @@ -380,20 +479,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); // LoadLoad cSeqOffset = calcSequenceOffset(consumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(offset); - spElement(offset, null); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore return e; } @@ -421,12 +521,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme @Override public Object peek() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + long currConsumerIndex; Object e; do { currConsumerIndex = lvConsumerIndex(); // other consumers may have grabbed the element, or queue might be empty - e = lpElement(calcElementOffset(currConsumerIndex)); + e = lpElement(buffer, calcElementOffset(currConsumerIndex, mask)); // only return null if queue is empty } while (e == null && currConsumerIndex != lvProducerIndex()); return e; @@ -455,6 +559,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme public boolean tryTransfer(Object item) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long consumerIndex; @@ -465,12 +570,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; - previousElement = null; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -499,20 +602,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // complimentary mode = pop+unpark off queue long cSeqOffset = calcSequenceOffset(consumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(offset); - spElement(offset, null); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore soItem1(e, item); unpark(e); @@ -540,6 +644,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long consumerIndex; @@ -550,20 +655,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); // LoadLoad cSeqOffset = calcSequenceOffset(consumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(offset); - spElement(offset, null); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore return e; } @@ -645,6 +751,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme @Override public boolean hasWaitingConsumer() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + long consumerIndex; long producerIndex; @@ -652,11 +762,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { return false; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -670,6 +779,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme @Override public int getWaitingConsumerCount() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + long consumerIndex; long producerIndex; @@ -677,11 +790,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { return 0; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -699,6 +811,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme public final boolean hasPendingMessages() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + long consumerIndex; long producerIndex; @@ -706,11 +822,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { return true; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -754,12 +869,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme spins = PARK_UNTIMED_SPINS; } - if (spins > 0) { - randomYields = ThreadLocalRandom.current(); - } + randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { - if (randomYields.nextInt(256) == 0) { - Thread.yield(); // occasionally yield + if (randomYields.nextInt(1024) == 0) { + UnsafeAccess.UNSAFE.park(false, 1L); } --spins; } else if (timed) { @@ -768,7 +881,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme lastTime = now; if (remaining > 0) { if (remaining < SPIN_THRESHOLD) { - // a park is too slow for this number, so just busy spin instead + // a park is too slow for this time, so just busy spin instead busySpin(PARK_UNTIMED_SPINS); } else { UnsafeAccess.UNSAFE.park(false, nanos); @@ -796,6 +909,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme private final boolean producerXfer(final Object item, final boolean timed, final long nanos) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; + final Object[] buffer = this.buffer; final long[] sBuffer = this.sequenceBuffer; long consumerIndex; @@ -806,12 +920,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme consumerIndex = lvConsumerIndex(); producerIndex = lvProducerIndex(); - final Object previousElement; if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; - previousElement = null; } else { - previousElement = lpElement(calcElementOffset(producerIndex-1)); + final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -830,7 +942,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (delta == 0) { // this is expected if we see this first time around - if (casProducerIndex(producerIndex, producerIndex + 1)) { + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { // Successful CAS: full barrier final Thread myThread = Thread.currentThread(); @@ -843,12 +956,12 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(producerIndex, mask); - spElement(offset, node); + spElement(buffer, offset, node); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore return park(node, myThread, timed, nanos); } @@ -861,20 +974,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // complimentary mode = pop+unpark off queue long cSeqOffset = calcSequenceOffset(consumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (consumerIndex + 1); + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; if (delta == 0) { - if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { // Successful CAS: full barrier // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(offset); - spElement(offset, null); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore soItem1(e, item); unpark(e); diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java index 3b42ffe..8c1eaf2 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java @@ -2,16 +2,14 @@ package dorkbox.util.messagebus.common.simpleq.jctools; import org.jctools.util.UnsafeAccess; -import dorkbox.util.messagebus.common.simpleq.MessageType; - abstract class ColdItems { public int type = 0; - public int messageType = MessageType.ONE; +// public int messageType = MessageType.ONE; public Object item1 = null; - public Object item2 = null; - public Object item3 = null; - public Object[] item4 = null; +// public Object item2 = null; +// public Object item3 = null; +// public Object[] item4 = null; } abstract class Pad0 extends ColdItems { diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java index b4b7f9b..3ce52d6 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue.java @@ -7,26 +7,31 @@ import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class PerfTest_MpmcTransferArrayQueue { +// static { +// System.setProperty("sparse.shift", "2"); +// } + + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 4; + private static final int concurrency = 2; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - final int warmupRuns = 2; + final int warmupRuns = 4; final int runs = 5; long average = 0; - final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency); average = averageRun(warmupRuns, runs, queue); // SimpleQueue.INPROGRESS_SPINS = 64; @@ -151,8 +156,9 @@ public class PerfTest_MpmcTransferArrayQueue { Object result = null; int i = REPETITIONS; + Node node = new Node(); do { - result = consumer.take(); + result = consumer.take(node); } while (0 != --i); this.result = result; diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java index 46f33b7..1ed67a1 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java @@ -11,12 +11,12 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock { private static final int concurrency = 1; public static void main(final String[] args) throws Exception { - final int warmupRuns = 2; + final int warmupRuns = 5; final int runs = 5; long average = 0; - final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency, QUEUE_CAPACITY); average = averageRun(warmupRuns, runs, queue); // SimpleQueue.INPROGRESS_SPINS = 64;