diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java index 603cf51..7f34856 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java @@ -2,20 +2,25 @@ package dorkbox.util.messagebus.common.simpleq; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.LockSupport; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; -import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField { + public static final int TYPE_FREE = 0; + public static final int TYPE_CONSUMER = 1; + public static final int TYPE_PRODUCER = 2; + public static final int TYPE_CANCELED = 3; + // private final static long THREAD; -// private static final long TYPE_OFFSET; +// private static final long IS_CONSUMER; private static final long ITEM1_OFFSET; static { try { -// TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); +// IS_CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); // THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); } catch (NoSuchFieldException e) { @@ -31,6 +36,15 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField return UNSAFE.getObject(node, ITEM1_OFFSET); } +// static final boolean lpType(Object node) { +// return UNSAFE.getBoolean(node, IS_CONSUMER); +// } +// +// static final void spType(Object node, boolean isConsumer) { +// UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer); +// } + + // public final Thread get() { // return (Thread) UNSAFE.getObject(this, THREAD); // } @@ -45,39 +59,26 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField // } /** The number of CPUs */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); + private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. + * The number of times to spin (with randomly interspersed calls + * to Thread.yield) on multiprocessor before blocking when a node + * is apparently the first waiter in the queue. See above for + * explanation. Must be a power of two. The value is empirically + * derived -- it works pretty well across a variety of processors, + * numbers of CPUs, and OSes. */ - private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000 - - /** The number of slots in the elimination array. */ - private static final int ARENA_LENGTH = Pow2.roundToPowerOfTwo((NCPU + 1) / 2); - - /** The mask value for indexing into the arena. */ - private static int ARENA_MASK = ARENA_LENGTH - 1; - - /** The number of times to step ahead, probe, and try to match. */ - private static final int LOOKAHEAD = Math.min(4, NCPU); - - /** The number of times to spin per lookahead step */ - private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD; - - /** A marker indicating that the arena slot is free. */ - private static final Object FREE = null; - - /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ - private static final Object WAITER = new Object(); - - - /** The arena where slots can be used to perform an exchange. */ - private final AtomicReference[] arena; + private static final int FRONT_SPINS = 1 << 7; + /** + * The number of times to spin before blocking when a node is + * preceded by another node that is apparently spinning. Also + * serves as an increment to FRONT_SPINS on phase changes, and as + * base average frequency for yielding during spins. Must be a + * power of two. + */ + private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; long p40, p41, p42, p43, p44, p45, p46; long p30, p31, p32, p33, p34, p35, p36, p37; @@ -95,145 +96,357 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) { // on 64bit(no compressed oops) JVM this is the same as seqOffset final long elementOffset = calcElementOffset(currentProducerIndex, mask); + spElementType(elementOffset, TYPE_FREE); soElement(elementOffset, new Node()); } - - this.arena = new PaddedAtomicReference[ARENA_LENGTH]; - for (int i = 0; i < ARENA_LENGTH; i++) { - this.arena[i] = new PaddedAtomicReference(); - } } - /** - * PRODUCER - */ - public void put(Object item) { - final Thread thread = Thread.currentThread(); + public Object xfer(final Object item, final boolean timed, final long nanos, final int incomingType) throws InterruptedException { + // empty or same mode = push+park onto queue + // complimentary mode = unpark+pop off queue // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = this.mask + 1; final long[] sBuffer = this.sequenceBuffer; -// long currentConsumerIndex; - long currentProducerIndex; + long currentProducerIndex = -1; // start with bogus value, hope we don't need it long pSeqOffset; - long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it + + long currentConsumerIndex = Long.MAX_VALUE; // start with bogus value, hope we don't need it + long cSeqOffset; + + int prevElementType; while (true) { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. -// currentConsumerIndex = lvConsumerIndex(); // LoadLoad + currentConsumerIndex = lvConsumerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad + + // the sequence must be loaded before we get the previous node type, otherwise previousNodeType can be stale/ pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; + final long pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier + // if the queue is EMPTY, this will return TYPE_FREE + // other consumers may have grabbed the element, or queue might be empty + // may be FREE or CONSUMER/PRODUCER + final long prevElementOffset = calcElementOffset(currentProducerIndex - 1, mask); + prevElementType = lpElementType(prevElementOffset); - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentProducerIndex, mask); - Object lpElement = lpElementNoCast(offset); - spItem1(lpElement, item); + if (prevElementType == TYPE_CANCELED) { + // pop off and continue. - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore + cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - return; + final long nextConsumerIndex = currentConsumerIndex + 1; + final long delta = seq - nextConsumerIndex; + + if (delta == 0) { + if (prevElementType != lpElementType(prevElementOffset)) { + // inconsistent read of type. + continue; + } + + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentConsumerIndex, mask); + spElementType(offset, TYPE_FREE); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + } + // failed cas, retry 1 + } else if (delta < 0 && // slot has not been moved by producer + currentConsumerIndex >= currentProducerIndex && // test against cached pIndex + currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + // return null; + + // spin + busySpin(prevElementType, incomingType); } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= cIndex && // test against cached cIndex - currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] -// return null; + + // another consumer beat us and moved sequence ahead, retry 2 + // only producer busyspins + continue; } - // another producer has moved the sequence by one, retry 2 - // only producer will busySpin if contention - busySpin(); + // it is possible that two threads check the queue at the exact same time, + // BOTH can think that the queue is empty, resulting in a deadlock between threads + // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when + // in reality, it is. + + if (prevElementType == TYPE_FREE || prevElementType == incomingType) { + // empty or same mode = push+park onto queue + + if (timed && nanos <= 0) { + // can't wait + return null; + } + + final long delta = pSeq - currentProducerIndex; + if (delta == 0) { + if (prevElementType != lpElementType(prevElementOffset)) { + // inconsistent read of type. + continue; + } + + // this is expected if we see this first time around + final long nextProducerIndex = currentProducerIndex + 1; + if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentProducerIndex, mask); + + if (prevElementType == TYPE_FREE && currentProducerIndex != currentConsumerIndex) { + // inconsistent read + + // try to undo, if possible. + if (!casProducerIndex(nextProducerIndex, currentProducerIndex)) { + spElementType(offset, TYPE_CANCELED); + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore + } + + continue; + } + + spElementType(offset, incomingType); + spElementThread(offset, Thread.currentThread()); + + if (incomingType == TYPE_CONSUMER) { + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore + + park(offset, timed, nanos); + + Object lpElement = lpElementNoCast(offset); + Object lpItem1 = lpItem1(lpElement); + + return lpItem1; + } else { + // producer + Object lpElement = lpElementNoCast(offset); + spItem1(lpElement, item); + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore + + park(offset, timed, nanos); + + return null; + } + } + // failed cas, retry 1 + } else if (delta < 0 && // poll has not moved this value forward + currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex + currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + // return false; + // we spin if the queue is full + } + + // another producer has moved the sequence by one, retry 2 + + // only producer will busySpin + busySpin(prevElementType, incomingType); + } + else { + // complimentary mode = unpark+pop off queue + + cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + + final long nextConsumerIndex = currentConsumerIndex + 1; + final long delta = seq - nextConsumerIndex; + + if (delta == 0) { + if (prevElementType != lpElementType(prevElementOffset)) { + // inconsistent read of type. + continue; + } + + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + + if (prevElementType != lpElementType(prevElementOffset)) { + System.err.println("WAHT"); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + continue; + } + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentConsumerIndex, mask); + spElementType(offset, TYPE_FREE); + + final Object thread = lpElementThread(offset); + final Object lpElement = lpElementNoCast(offset); + + if (incomingType == TYPE_CONSUMER) { + // is already cancelled/fulfilled + if (thread == null || + !casElementThread(offset, thread, null)) { // FULL barrier + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + continue; + } + + Object returnItem = lpItem1(lpElement); + spItem1(lpElement, null); + + UNSAFE.unpark(thread); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + return returnItem; + } else { + // producer + spItem1(lpElement, item); + + // is already cancelled/fulfilled + if (thread == null || + !casElementThread(offset, thread, null)) { // FULL barrier + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + continue; + } + + UNSAFE.unpark(thread); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + return null; + } + } + // failed cas, retry 1 + } else if (delta < 0 && // slot has not been moved by producer + currentConsumerIndex >= currentProducerIndex && // test against cached pIndex + currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + // return null; + + // spin + busySpin(prevElementType, incomingType); + } + + // another consumer beat us and moved sequence ahead, retry 2 + // only producer busyspins + } } } - /** - * CONSUMER - */ - public Object take() { - final Thread thread = Thread.currentThread(); + private static final void busySpin(int previousNodeType, int currentNodeType) { + ThreadLocalRandom randomYields = null; // bound if needed + randomYields = ThreadLocalRandom.current(); - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - - long currentConsumerIndex; -// long currentProducerIndex; - long cSeqOffset; - long pIndex = -1; // start with bogus value, hope we don't need it - - while (true) { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - currentConsumerIndex = lvConsumerIndex(); // LoadLoad -// currentProducerIndex = lvProducerIndex(); // LoadLoad - - - cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (currentConsumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); - final Object e = lpElementNoCast(offset); - Object item = lpItem1(e); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore - - return item; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - currentConsumerIndex >= pIndex && // test against cached pIndex - currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// return null; - - // contention. - busySpin(); - } - - // another consumer beat us and moved sequence ahead, retry 2 - // only producer busyspins - } - } - - private static final void busySpin() { // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; + int spins = spinsFor(previousNodeType, currentNodeType); +// int spins = 512; for (;;) { if (spins > 0) { --spins; + if (randomYields.nextInt(CHAINED_SPINS) == 0) { + LockSupport.parkNanos(1); // occasionally yield + } } else { break; } } } + /** + * Returns spin/yield value for a node with given predecessor and + * data mode. See above for explanation. + */ + private final static int spinsFor(int previousNodeType, int currentNodeType) { +// if (MP && pred != null) { +// if (previousNodeType != currentNodeType) { +// // in the process of changing modes +// return FRONT_SPINS + CHAINED_SPINS; +// } +// if (pred.isMatched()) { + // at the front of the queue + return FRONT_SPINS; +// } +// if (pred.waiter == null) { +// // previous is spinning +// return CHAINED_SPINS; +// } +// } +// +// return 0; + } + + + private final void park(long offset, final boolean timed, final long nanos) throws InterruptedException { +// long lastTime = timed ? System.nanoTime() : 0; +// int spins = timed ? maxTimedSpins : maxUntimedSpins; + int spins = 2000; + Thread myThread = Thread.currentThread(); + +// if (timed) { +// long now = System.nanoTime(); +// nanos -= now - lastTime; +// lastTime = now; +// if (nanos <= 0) { +//// s.tryCancel(e); +// continue; +// } +// } + + // busy spin for the amount of time (roughly) of a CPU context switch + // then park (if necessary) + for (;;) { + if (lpElementThread(offset) == null) { + return; + } else if (spins > 0) { + --spins; +// } else if (spins > negMaxUntimedSpins) { +// --spins; +// LockSupport.parkNanos(1); + } else { + // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. + LockSupport.park(); + + if (myThread.isInterrupted()) { + casElementThread(offset, myThread, null); + Thread.interrupted(); + throw new InterruptedException(); + } + } + } + } + + + + + + + + @Override public boolean offer(Node message) { return false; diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 3308355..1c038e7 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -13,6 +13,7 @@ abstract class ColdItems extends PrePad { // private static AtomicInteger count = new AtomicInteger(); // public final int ID = count.getAndIncrement(); +// public boolean isConsumer = false; // public short type = MessageType.ONE; public Object item1 = null; // public Object item2 = null; @@ -26,7 +27,6 @@ abstract class Pad0 extends ColdItems { } abstract class HotItem1 extends Pad0 { -// public transient volatile boolean isConsumer = false; } abstract class Pad1 extends HotItem1 { diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index a0adb08..17e6c04 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -1,9 +1,10 @@ package dorkbox.util.messagebus.common.simpleq; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; -public final class SimpleQueue extends LinkedArrayList { +import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; + +public final class SimpleQueue { /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -40,18 +41,17 @@ public final class SimpleQueue extends LinkedArrayList { */ static final long spinForTimeoutThreshold = 1000L; - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; + private MpmcExchangerQueue queue; public SimpleQueue(final int size) { - super(size); + this.queue = new MpmcExchangerQueue(Pow2.roundToPowerOfTwo(size)); } /** * PRODUCER */ public void put(Object item) throws InterruptedException { - xfer(item, false, 0); + this.queue.xfer(item, false, 0, MpmcExchangerQueue.TYPE_PRODUCER); } @@ -59,136 +59,138 @@ public final class SimpleQueue extends LinkedArrayList { * CONSUMER */ public Object take() throws InterruptedException { - return xfer(null, false, 0); +// this.queue.xfer(123, false, 0, MpmcExchangerQueue.TYPE_PRODUCER); +// return 123; + return this.queue.xfer(null, false, 0, MpmcExchangerQueue.TYPE_CONSUMER); } - private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException { - final boolean isConsumer= item == null; - - while (true) { - // empty or same mode = push+park onto queue - // complimentary mode = unpark+pop off queue - - final Object tail = lvTail(); // LoadLoad - final Object head = lvHead(); // LoadLoad - Object thread; - + private Object xfer(Object item, boolean timed, long nanos, byte incomingType) throws InterruptedException { + return null; +// while (true) { +// empty or same mode = push+park onto queue +// complimentary mode = unpark+pop off queue +// +// Object thread; +// // it is possible that two threads check the queue at the exact same time, // BOTH can think that the queue is empty, resulting in a deadlock between threads // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when // in reality, it is. - boolean empty = head == lpNext(tail); - boolean sameMode = lpType(tail) == isConsumer; - - // check to make sure we are not "double dipping" on our head - thread = lpThread(head); - if (empty && thread != null) { - empty = false; - } else if (sameMode && thread == null) { - busySpin(); - continue; - } - - // empty or same mode = push+park onto queue - if (empty || sameMode) { - if (timed && nanos <= 0) { - // can't wait - return null; - } - - final Object tNext = lpNext(tail); - if (tail != lvTail()) { // LoadLoad - // inconsistent read - busySpin(); - continue; - } - - if (isConsumer) { - spType(tNext, isConsumer); - spThread(tNext, Thread.currentThread()); - - if (!advanceTail(tail, tNext)) { // FULL barrier - // failed to link in -// busySpin(); - continue; - } - - park(tNext, timed, nanos); - - // this will only advance head if necessary +// final boolean empty = this.queue.isEmpty(); // LoadLoad +// boolean sameMode; +// if (!empty) { +// sameMode = this.queue.getLastMessageType() == isConsumer; +// } +// +// // check to make sure we are not "double dipping" on our head +//// thread = lpThread(head); +//// if (empty && thread != null) { +//// empty = false; +//// } else if (sameMode && thread == null) { +//// busySpin(); +//// continue; +//// } +// +// // empty or same mode = push+park onto queue +// if (empty || sameMode) { +// if (timed && nanos <= 0) { +// // can't wait +// return null; +// } +// +// final Object tNext = lpNext(tail); +// if (tail != lvTail()) { // LoadLoad +// // inconsistent read +// busySpin(); +// continue; +// } +// +// if (isConsumer) { +// spType(tNext, isConsumer); +// spThread(tNext, Thread.currentThread()); +// +// if (!advanceTail(tail, tNext)) { // FULL barrier +// // failed to link in +//// busySpin(); +// continue; +// } +// +// park(tNext, timed, nanos); +// +// // this will only advance head if necessary +//// advanceHead(tail, tNext); +// Object lpItem1 = lpItem1(tNext); +// spItem1(tNext, null); +// return lpItem1; +// } else { +// // producer +// spType(tNext, isConsumer); +// spItem1(tNext, item); +// spThread(tNext, Thread.currentThread()); +// +// if (!advanceTail(tail, tNext)) { // FULL barrier +// // failed to link in +// continue; +// } +// +// park(tNext, timed, nanos); +// +// // this will only advance head if necessary // advanceHead(tail, tNext); - Object lpItem1 = lpItem1(tNext); - spItem1(tNext, null); - return lpItem1; - } else { - // producer - spType(tNext, isConsumer); - spItem1(tNext, item); - spThread(tNext, Thread.currentThread()); - - if (!advanceTail(tail, tNext)) { // FULL barrier - // failed to link in - continue; - } - - park(tNext, timed, nanos); - - // this will only advance head if necessary - advanceHead(tail, tNext); - return null; - } - } - // complimentary mode = unpark+pop off queue - else { - Object next = lpNext(head); - - if (tail != lvTail() || head != lvHead()) { // LoadLoad - // inconsistent read - busySpin(); - continue; - } - - thread = lpThread(head); - if (isConsumer) { - Object returnVal = lpItem1(head); - - // is already cancelled/fulfilled - if (thread == null || - !casThread(head, thread, null)) { // FULL barrier - - // head was already used by a different thread - advanceHead(head, next); // FULL barrier - - continue; - } - - spItem1(head, null); - LockSupport.unpark((Thread) thread); - - advanceHead(head, next); // FULL barrier - - return returnVal; - } else { - // producer - spItem1(head, item); // StoreStore - - // is already cancelled/fulfilled - if (thread == null || - !casThread(head, thread, null)) { // FULL barrier - - // head was already used by a different thread - advanceHead(head, next); // FULL barrier - - continue; - } - - LockSupport.unpark((Thread) thread); - - advanceHead(head, next); // FULL barrier - return null; - } - } - } +// return null; +// } +// } +// // complimentary mode = unpark+pop off queue +// else { +// Object next = lpNext(head); +// +// if (tail != lvTail() || head != lvHead()) { // LoadLoad +// // inconsistent read +// busySpin(); +// continue; +// } +// +// thread = lpThread(head); +// if (isConsumer) { +// Object returnVal = lpItem1(head); +// +// // is already cancelled/fulfilled +// if (thread == null || +// !casThread(head, thread, null)) { // FULL barrier +// +// // head was already used by a different thread +// advanceHead(head, next); // FULL barrier +// +// continue; +// } +// +// spItem1(head, null); +// LockSupport.unpark((Thread) thread); +// +// advanceHead(head, next); // FULL barrier +// +// return returnVal; +// } else { +// // producer +// spItem1(head, item); // StoreStore +// +// // is already cancelled/fulfilled +// if (thread == null || +// !casThread(head, thread, null)) { // FULL barrier +// +// // head was already used by a different thread +// advanceHead(head, next); // FULL barrier +// +// continue; +// } +// +// LockSupport.unpark((Thread) thread); +// +// advanceHead(head, next); // FULL barrier +// return null; +// } +// } +// } } private static final void busySpin2() { @@ -215,44 +217,44 @@ public final class SimpleQueue extends LinkedArrayList { } } - private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException { -// long lastTime = timed ? System.nanoTime() : 0; -// int spins = timed ? maxTimedSpins : maxUntimedSpins; - int spins = maxUntimedSpins; - Thread myThread = Thread.currentThread(); - -// if (timed) { -// long now = System.nanoTime(); -// nanos -= now - lastTime; -// lastTime = now; -// if (nanos <= 0) { -//// s.tryCancel(e); -// continue; -// } -// } - - // busy spin for the amount of time (roughly) of a CPU context switch - // then park (if necessary) - for (;;) { - if (lpThread(myNode) == null) { - return; -// } else if (spins > 0) { +// private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException { +//// long lastTime = timed ? System.nanoTime() : 0; +//// int spins = timed ? maxTimedSpins : maxUntimedSpins; +// int spins = maxUntimedSpins; +// Thread myThread = Thread.currentThread(); +// +//// if (timed) { +//// long now = System.nanoTime(); +//// nanos -= now - lastTime; +//// lastTime = now; +//// if (nanos <= 0) { +////// s.tryCancel(e); +//// continue; +//// } +//// } +// +// // busy spin for the amount of time (roughly) of a CPU context switch +// // then park (if necessary) +// for (;;) { +// if (lpThread(myNode) == null) { +// return; +//// } else if (spins > 0) { +//// --spins; +// } else if (spins > negMaxUntimedSpins) { // --spins; - } else if (spins > negMaxUntimedSpins) { - --spins; -// LockSupport.parkNanos(1); - } else { - // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. - LockSupport.park(); - - if (myThread.isInterrupted()) { - casThread(myNode, myThread, null); - Thread.interrupted(); - throw new InterruptedException(); - } - } - } - } +//// LockSupport.parkNanos(1); +// } else { +// // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. +// LockSupport.park(); +// +// if (myThread.isInterrupted()) { +// casThread(myNode, myThread, null); +// Thread.interrupted(); +// throw new InterruptedException(); +// } +// } +// } +// } // @Override // public boolean isEmpty() { diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java index 65a4e30..e8b532a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java @@ -45,6 +45,7 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular protected static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; + static { final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); if (4 == scale) { @@ -99,18 +100,7 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param e a kitty */ protected final void spElement(long offset, E e) { - spElement(this.buffer, offset, e); - } - - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void spElement(E[] buffer, long offset, E e) { - UNSAFE.putObject(buffer, offset, e); + UNSAFE.putObject(this.buffer, offset, e); } /** @@ -119,40 +109,29 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @param e a kitty */ - protected final void spElementType(long offset, Boolean e) { - spElementType(this.typeBuffer, offset, e); + protected final void spElementType(long offset, int e) { + UNSAFE.putInt(this.typeBuffer, offset, e); } /** - * A plain store (no ordering/fences) of an element TYPE to a given offset + * An ordered store(store + StoreStore barrier) of an element TYPE to a given offset * * @param buffer this.buffer * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @param e an orderly kitty */ - protected final void spElementType(Boolean[] buffer, long offset, Boolean e) { - UNSAFE.putObject(buffer, offset, e); + protected final void soElementType(long offset, int e) { + UNSAFE.putOrderedInt(this.typeBuffer, offset, e); } /** - * A plain store (no ordering/fences) of an element TYPE to a given offset + * A plain store (no ordering/fences) of an element THREAD to a given offset * * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @param e a kitty */ protected final void spElementThread(long offset, Thread e) { - spElementThread(this.threadBuffer, offset, e); - } - - /** - * A plain store (no ordering/fences) of an element TYPE to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void spElementThread(Thread[] buffer, long offset, Thread e) { - UNSAFE.putObject(buffer, offset, e); + UNSAFE.putObject(this.threadBuffer, offset, e); } /** @@ -162,18 +141,7 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param e an orderly kitty */ protected final void soElement(long offset, E e) { - soElement(this.buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(E[] buffer, long offset, E e) { - UNSAFE.putOrderedObject(buffer, offset, e); + UNSAFE.putOrderedObject(this.buffer, offset, e); } /** @@ -182,35 +150,19 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final E lpElement(long offset) { - return lpElement(this.buffer, offset); - } - - protected final Object lpElementNoCast(long offset) { - return lpElementNoCast(this.buffer, offset); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ @SuppressWarnings("unchecked") - protected final E lpElement(E[] buffer, long offset) { - return (E) UNSAFE.getObject(buffer, offset); + protected final E lpElement(long offset) { + return (E) UNSAFE.getObject(this.buffer, offset); } /** * A plain load (no ordering/fences) of an element from a given offset. * - * @param buffer this.buffer * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final Object lpElementNoCast(E[] buffer, long offset) { - return UNSAFE.getObject(buffer, offset); + protected final Object lpElementNoCast(long offset) { + return UNSAFE.getObject(this.buffer, offset); } /** @@ -219,12 +171,18 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final Boolean lpElementType(long offset) { - return lpElementType(this.typeBuffer, offset); + protected final int lpElementType(long offset) { + return UNSAFE.getInt(this.typeBuffer, offset); } - protected final Boolean lpElementType(Boolean[] buffer, long offset) { - return (Boolean) UNSAFE.getObject(buffer, offset); + /** + * A volatile load (load + LoadLoad barrier) of an element TYPE from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final int lvElementType(long offset) { + return UNSAFE.getIntVolatile(this.typeBuffer, offset); } /** @@ -233,12 +191,8 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final Thread lpElementThread(long offset) { - return lpElementThread(this.threadBuffer, offset); - } - - protected final Thread lpElementThread(Thread[] buffer, long offset) { - return (Thread) UNSAFE.getObject(buffer, offset); + protected final Object lpElementThread(long offset) { + return UNSAFE.getObject(this.threadBuffer, offset); } /** @@ -247,20 +201,9 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final E lvElement(long offset) { - return lvElement(this.buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ @SuppressWarnings("unchecked") - protected final E lvElement(E[] buffer, long offset) { - return (E) UNSAFE.getObjectVolatile(buffer, offset); + protected final E lvElement(long offset) { + return (E) UNSAFE.getObjectVolatile(this.buffer, offset); } /** @@ -269,26 +212,17 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} * @return the element at the offset */ - protected final Thread lvElementThread(long offset) { - return lvElementThread(this.threadBuffer, offset); + protected final Object lvElementThread(long offset) { + return UNSAFE.getObjectVolatile(this.threadBuffer, offset); } - /** - * A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final Thread lvElementThread(Thread[] buffer, long offset) { - return (Thread) UNSAFE.getObjectVolatile(buffer, offset); - } - - - protected final boolean casElementThread(long offset, Thread expect, Thread newValue) { + protected final boolean casElementThread(long offset, Object expect, Object newValue) { return UNSAFE.compareAndSwapObject(this.threadBuffer, offset, expect, newValue); } + protected final boolean casElementType(long offset, int expect, int newValue) { + return UNSAFE.compareAndSwapInt(this.typeBuffer, offset, expect, newValue); + } @Override public Iterator iterator() { diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java index 994b7b1..45542bb 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java @@ -18,12 +18,12 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue; import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; public class MpmcQueueAltPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); @@ -33,7 +33,7 @@ public class MpmcQueueAltPerfTest { System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY); + final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -48,16 +48,20 @@ public class MpmcQueueAltPerfTest { System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); } - private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception { + private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { Producer p = new Producer(queue); Thread thread = new Thread(p); thread.start(); // producer will timestamp start - MpmcExchangerQueue consumer = queue; - Object result; + MpmcArrayQueue consumer = queue; + Node result; int i = REPETITIONS; + int queueEmpty = 0; do { - result = consumer.take(); + while (null == (result = consumer.poll())) { + queueEmpty++; + Thread.yield(); + } } while (0 != --i); long end = System.nanoTime(); @@ -65,27 +69,45 @@ public class MpmcQueueAltPerfTest { long duration = end - p.start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); + System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, + qName, result.item1, queueEmpty, p.queueFull); return ops; } + @SuppressWarnings("rawtypes") public static class Producer implements Runnable { - private final MpmcExchangerQueue queue; + private final MpmcArrayQueue queue; + int queueFull = 0; long start; - public Producer(MpmcExchangerQueue queue) { + public Producer(MpmcArrayQueue queue) { this.queue = queue; } @Override public void run() { - MpmcExchangerQueue producer = this.queue; + MpmcArrayQueue producer = this.queue; int i = REPETITIONS; + int f = 0; long s = System.nanoTime(); + MpmcArrayQueue pool = new MpmcArrayQueue(2); + pool.offer(new Node()); + pool.offer(new Node()); + + Node node; do { - producer.put(TEST_VALUE); + node = pool.poll(); + node.item1 = TEST_VALUE; + + while (!producer.offer(node)) { + Thread.yield(); + f++; + } + + pool.offer(node); } while (0 != --i); + this.queueFull = f; this.start = s; } }