From eacca670def98490222aefd8601ccf41daa9a1d3 Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 20 Apr 2015 21:25:59 +0200 Subject: [PATCH] working, but slow. Using MPMC array queue --- .../util/messagebus/common/simpleq/Node.java | 12 +- .../common/simpleq/SimpleQueue.java | 378 +++++++----------- .../jctools/ConcurrentCircularArrayQueue.java | 103 ++++- .../jctools/MpmcArrayQueueConsumerField.java | 4 + .../messagebus/SimpleQueueAltPerfTest.java | 2 +- 5 files changed, 246 insertions(+), 253 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 7dc0c75..8c3f8a5 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,6 +1,5 @@ package dorkbox.util.messagebus.common.simpleq; -import com.lmax.disruptor.MessageType; // mpmc sparse.shift = 2, for this to be fast. @@ -9,12 +8,11 @@ abstract class PrePad { } abstract class ColdItems { - public short type = MessageType.ONE; - public boolean isConsumer = false; +// public short type = MessageType.ONE; public Object item1 = null; - public Object item2 = null; - public Object item3 = null; - public Object[] item4 = null; +// public Object item2 = null; +// public Object item3 = null; +// public Object[] item4 = null; } abstract class Pad0 extends ColdItems { @@ -22,7 +20,7 @@ abstract class Pad0 extends ColdItems { } abstract class HotItem1 extends ColdItems { - public volatile Thread thread; +// public volatile Thread thread; } public class Node extends HotItem1 { diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index d9ea9ff..c5751ac 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -9,43 +9,16 @@ import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerFiel public final class SimpleQueue extends MpmcArrayQueueConsumerField { - private final static long THREAD; private static final long ITEM1_OFFSET; - private static final long TYPE; - private static final long CONSUMER; static { try { - CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); - THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); - TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type")); ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } } - private static final void spIsConsumer(Object node, boolean value) { - UNSAFE.putBoolean(node, CONSUMER, value); - } - - private static final boolean lpIsConsumer(Object node) { - return UNSAFE.getBoolean(node, CONSUMER); - } - - private static final boolean lvIsConsumer(Object node) { - return UNSAFE.getBooleanVolatile(node, CONSUMER); - } - - - private static final void spType(Object node, short type) { - UNSAFE.putShort(node, TYPE, type); - } - - private static final short lpType(Object node) { - return UNSAFE.getShort(node, TYPE); - } - private static final void soItem1(Object node, Object item) { UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item); } @@ -62,37 +35,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { return UNSAFE.getObject(node, ITEM1_OFFSET); } - private static final Thread lvThread(Object node) { - return (Thread) UNSAFE.getObjectVolatile(node, THREAD); - } - - private static final Thread lpThread(Object node) { - return (Thread) UNSAFE.getObject(node, THREAD); - } - - private static final Object cancelledMarker = new Object(); - - private static final boolean lpIsCanceled(Object node) { - return cancelledMarker == UNSAFE.getObject(node, THREAD); - } - - private static final void spIsCancelled(Object node) { - UNSAFE.putObject(node, THREAD, cancelledMarker); - } - - private static final void soThread(Object node, Thread newValue) { - UNSAFE.putOrderedObject(node, THREAD, newValue); - } - - private static final void spThread(Object node, Thread newValue) { - UNSAFE.putObject(node, THREAD, newValue); - } - - private static final boolean casThread(Object node, Object expect, Object newValue) { - return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue); - } - - /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -104,7 +46,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. * The value here is approximately the average of those across a range of tested systems. */ - private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 + private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000 /** * The number of times to spin before blocking in timed waits. @@ -114,7 +56,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { * a constant. */ static final int maxTimedSpins = NCPU < 2 ? 0 : 32; - static final int negMaxTimedSpins = -maxTimedSpins; /** * The number of times to spin before blocking in untimed waits. @@ -122,6 +63,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { * faster since they don't need to check times on each spin. */ static final int maxUntimedSpins = maxTimedSpins * 16; + static final int negMaxUntimedSpins = -maxUntimedSpins; /** * The number of nanoseconds for which it is faster to spin @@ -129,8 +71,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { */ static final long spinForTimeoutThreshold = 1000L; - - long p40, p41, p42, p43, p44, p45, p46; long p30, p31, p32, p33, p34, p35, p36, p37; @@ -167,15 +107,17 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException { - boolean isConsumer = item == null; + final Boolean isConsumer = Boolean.valueOf(item == null); + final boolean consumerBoolValue = isConsumer.booleanValue(); // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = this.mask + 1; final long[] sBuffer = this.sequenceBuffer; - long currentConsumerIndex; - long currentProducerIndex; + // values we shouldn't reach + long currentConsumerIndex = -1; + long currentProducerIndex = Long.MAX_VALUE; long cSeqOffset; long pSeqOffset; @@ -187,90 +129,59 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { boolean empty = false; while (true) { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - currentConsumerIndex = lvConsumerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad // empty or same mode - // check what was last placed on the queue - if (currentProducerIndex == currentConsumerIndex) { - empty = true; - } else { - final long previousProducerIndex = currentProducerIndex - 1; + // push+park onto queue - final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask); - final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad - final long ppDelta = ppSeq - previousProducerIndex; - - if (ppDelta == 1) { - // same mode check - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(previousProducerIndex, mask); - Object element = lpElement(offset); - sameMode = lpIsConsumer(element) == isConsumer; - } else if (ppDelta < 1 && // slot has not been moved by producer - currentConsumerIndex >= currentProducerIndex && // test against cached pIndex - currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must - - // is empty - empty = true; - } else { - // hasn't been moved yet. retry 2 - busySpin(); - continue; - } - } + // we add ourselves to the queue and check status (maybe we park OR we undo, pop-consumer, and unpark) + pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); + pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + pDelta = pSeq - currentProducerIndex; + if (pDelta == 0) { + // this is expected if we see this first time around + final long nextProducerIndex = currentProducerIndex + 1; + if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { + // Successful CAS: full barrier - if (empty || sameMode) { - // push+park onto queue + // it is possible that two threads check the queue at the exact same time, + // BOTH can think that the queue is empty, resulting in a deadlock between threads + // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when + // in reality, it is. + currentConsumerIndex = lvConsumerIndex(); + empty = currentProducerIndex == currentConsumerIndex; - // we add ourselves to the queue and wait - pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); - pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - pDelta = pSeq - currentProducerIndex; - if (pDelta == 0) { - // this is expected if we see this first time around - final long nextProducerIndex = currentProducerIndex + 1; - if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { - // Successful CAS: full barrier + if (!empty) { + final long previousProducerIndex = currentProducerIndex - 1; + final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask); + final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad + final long ppDelta = ppSeq - previousProducerIndex; - // it is possible that two threads check the queue at the exact same time, - // BOTH can think that the queue is empty, resulting in a deadlock between threads - // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when - // in reality, it is. - currentConsumerIndex = lvConsumerIndex(); + if (ppDelta == 1) { + // same mode check - if (empty && currentProducerIndex != currentConsumerIndex) { - // RESET the push of this element. - empty = false; - casProducerIndex(nextProducerIndex, currentProducerIndex); - continue; - } else if (sameMode && currentProducerIndex == currentConsumerIndex) { - sameMode = false; - casProducerIndex(nextProducerIndex, currentProducerIndex); - continue; + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(previousProducerIndex, mask); + sameMode = lpElementType(offset) == isConsumer; } + } - + if (empty || sameMode) { // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentProducerIndex, mask); - final Object element = lpElement(offset); - spIsConsumer(element, isConsumer); - spThread(element, Thread.currentThread()); + spElementType(offset, isConsumer); + spElementThread(offset, Thread.currentThread()); - if (isConsumer) { + final Object element = lpElement(offset); + if (consumerBoolValue) { // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore // now we wait - park(element, timed, nanos); + park(element, offset, timed, nanos); return lvItem1(element); } else { spItem1(element, item); @@ -280,99 +191,103 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore // now we wait - park(element, timed, nanos); + park(element, offset, timed, nanos); return null; } - } - // failed cas, retry 1 - } else if (pDelta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex - currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - // return; - busySpin(); - } - } else { - // complimentary mode - - // get item - cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long nextConsumerIndex = currentConsumerIndex + 1; - final long cDelta = cSeq - nextConsumerIndex; - - if (cDelta == 0) { - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); - final Object element = lpElement(offset); - final Thread thread = lpThread(element); - - if (isConsumer) { - if (thread == null || // is cancelled/fulfilled already - !casThread(element, thread, null)) { // failed cas state - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - } - continue; - } - - // success - Object item1 = lpItem1(element); - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - LockSupport.unpark(thread); - - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - return item1; - } - - continue; } else { - soItem1(element, item); + // complimentary mode - if (thread == null || // is cancelled/fulfilled already - !casThread(element, thread, null)) { // failed cas state + // undo my push, since we will be poping off the queue instead + casProducerIndex(nextProducerIndex, currentProducerIndex); - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + while (true) { + // get item + cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); + final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long nextConsumerIndex = currentConsumerIndex + 1; + final long cDelta = cSeq - nextConsumerIndex; + + if (cDelta == 0) { + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentConsumerIndex, mask); + final Thread thread = lpElementThread(offset); + + if (consumerBoolValue) { + if (thread == null || // is cancelled/fulfilled already + !casElementThread(offset, thread, null)) { // failed cas state + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + } + + continue; + } + + // success + final Object element = lpElement(offset); + Object item1 = lpItem1(element); + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + LockSupport.unpark(thread); + + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + return item1; + } + + busySpin(); + // failed CAS + continue; + } else { + final Object element = lpElement(offset); + soItem1(element, item); + + if (thread == null || // is cancelled/fulfilled already + !casElementThread(offset, thread, null)) { // failed cas state + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + } + + // lost CAS + busySpin(); + continue; + } + + // success + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + + LockSupport.unpark(thread); + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + return null; + } + + // lost CAS + busySpin(); + continue; + } } - continue; } - - // success - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - - LockSupport.unpark(thread); - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - - return null; - } - - // lost CAS - busySpin(); - continue; } - - } else if (cDelta < 0 && // slot has not been moved by producer - currentConsumerIndex >= currentProducerIndex && // test against cached pIndex - currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - // return null; - busySpin(); - } + // failed cas, retry 1 + } else if (pDelta < 0 && // poll has not moved this value forward + currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex + currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + // return; } + // contention. busySpin(); } @@ -380,7 +295,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { private static final void busySpin() { // busy spin for the amount of time (roughly) of a CPU context switch - int spins = maxUntimedSpins; + int spins = SPINS; for (;;) { if (spins > 0) { --spins; @@ -395,7 +310,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { * @return * @return false if we were interrupted, true if we were unparked by another thread */ - private static final boolean park(Object myNode, boolean timed, long nanos) throws InterruptedException { + private final boolean park(Object myNode, long myOffset, boolean timed, long nanos) throws InterruptedException { // long lastTime = timed ? System.nanoTime() : 0; // int spins = timed ? maxTimedSpins : maxUntimedSpins; int spins = maxUntimedSpins; @@ -413,51 +328,26 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { // busy spin for the amount of time (roughly) of a CPU context switch // then park (if necessary) - int spin = spins; for (;;) { - if (lvThread(myNode) == null) { + if (lvElementThread(myOffset) == null) { return true; - } else if (spin > 0) { - --spin; - } else if (spin > negMaxTimedSpins) { + } else if (spins > 0) { + --spins; + } else if (spins > negMaxUntimedSpins) { + --spins; LockSupport.parkNanos(1); } else { // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. LockSupport.park(); if (myThread.isInterrupted()) { - casThread(myNode, myThread, null); + casElementThread(myOffset, myThread, null); return false; } } } } -// /** -// * Unparks the other node (if it was waiting) -// */ -// private static final void unpark(Object otherNode) { -// Thread myThread = Thread.currentThread(); -// Thread thread; -// -// for (;;) { -// thread = getThread(otherNode); -// if (threadCAS(otherNode, thread, myThread)) { -// if (thread == null) { -// // no parking (UNPARK won the race) -// return; -// } else if (thread != myThread) { -// // park will always set the waiter back to null -// LockSupport.unpark(thread); -// return; -// } else { -// // contention -// busySpin(); -// } -// } -// } -// } - @Override public boolean offer(Node message) { return false; diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java index 66f59eb..2d598af 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java @@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue imp * @param */ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); + protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); protected static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; @@ -63,6 +63,8 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular protected final long mask; // @Stable :( protected final E[] buffer; + protected final Boolean[] typeBuffer; + protected final Thread[] threadBuffer; @SuppressWarnings("unchecked") public ConcurrentCircularArrayQueue(int capacity) { @@ -70,6 +72,8 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular this.mask = actualCapacity - 1; // pad data on either end with some empty slots. this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; + this.typeBuffer = new Boolean[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; + this.threadBuffer = new Thread[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; } /** @@ -109,6 +113,48 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular UNSAFE.putObject(buffer, offset, e); } + /** + * A plain store (no ordering/fences) of an element TYPE to a given offset + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e a kitty + */ + protected final void spElementType(long offset, Boolean e) { + spElementType(this.typeBuffer, offset, e); + } + + /** + * A plain store (no ordering/fences) of an element TYPE to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e an orderly kitty + */ + protected final void spElementType(Boolean[] buffer, long offset, Boolean e) { + UNSAFE.putObject(buffer, offset, e); + } + + /** + * A plain store (no ordering/fences) of an element TYPE to a given offset + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e a kitty + */ + protected final void spElementThread(long offset, Thread e) { + spElementThread(this.threadBuffer, offset, e); + } + + /** + * A plain store (no ordering/fences) of an element TYPE to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e an orderly kitty + */ + protected final void spElementThread(Thread[] buffer, long offset, Thread e) { + UNSAFE.putObject(buffer, offset, e); + } + /** * An ordered store(store + StoreStore barrier) of an element to a given offset * @@ -152,6 +198,34 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular return (E) UNSAFE.getObject(buffer, offset); } + /** + * A plain load (no ordering/fences) of an element TYPE from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final Boolean lpElementType(long offset) { + return lpElementType(this.typeBuffer, offset); + } + + protected final Boolean lpElementType(Boolean[] buffer, long offset) { + return (Boolean) UNSAFE.getObject(buffer, offset); + } + + /** + * A plain load (no ordering/fences) of an element THREAD from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final Thread lpElementThread(long offset) { + return lpElementThread(this.threadBuffer, offset); + } + + protected final Thread lpElementThread(Thread[] buffer, long offset) { + return (Thread) UNSAFE.getObject(buffer, offset); + } + /** * A volatile load (load + LoadLoad barrier) of an element from a given offset. * @@ -174,6 +248,33 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular return (E) UNSAFE.getObjectVolatile(buffer, offset); } + /** + * A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final Thread lvElementThread(long offset) { + return lvElementThread(this.threadBuffer, offset); + } + + /** + * A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final Thread lvElementThread(Thread[] buffer, long offset) { + return (Thread) UNSAFE.getObjectVolatile(buffer, offset); + } + + + protected final boolean casElementThread(long offset, Thread expect, Thread newValue) { + return UNSAFE.compareAndSwapObject(this.threadBuffer, offset, expect, newValue); + } + + @Override public Iterator iterator() { throw new UnsupportedOperationException(); diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java index b0e9f38..8b3568b 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueueConsumerField.java @@ -21,6 +21,10 @@ public abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad return this.consumerIndex; } + protected final long lpConsumerIndex() { + return UNSAFE.getLong(this, C_INDEX_OFFSET); + } + protected final boolean casConsumerIndex(long expect, long newValue) { return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); } diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java index 8e90bd4..1b6cff3 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java @@ -23,7 +23,7 @@ import dorkbox.util.messagebus.common.simpleq.SimpleQueue; public class SimpleQueueAltPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);