WIP - got mpmc queue w/ parks + fixed-deadlocking. @ 4.6m ops/sec

This commit is contained in:
nathan 2015-04-22 22:47:23 +02:00
parent 8b23c992e1
commit 4276f881e8
5 changed files with 584 additions and 413 deletions

View File

@ -2,20 +2,25 @@ package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node> {
public static final int TYPE_FREE = 0;
public static final int TYPE_CONSUMER = 1;
public static final int TYPE_PRODUCER = 2;
public static final int TYPE_CANCELED = 3;
// private final static long THREAD;
// private static final long TYPE_OFFSET;
// private static final long IS_CONSUMER;
private static final long ITEM1_OFFSET;
static {
try {
// TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
// IS_CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
// THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
} catch (NoSuchFieldException e) {
@ -31,6 +36,15 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
return UNSAFE.getObject(node, ITEM1_OFFSET);
}
// static final boolean lpType(Object node) {
// return UNSAFE.getBoolean(node, IS_CONSUMER);
// }
//
// static final void spType(Object node, boolean isConsumer) {
// UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer);
// }
// public final Thread get() {
// return (Thread) UNSAFE.getObject(this, THREAD);
// }
@ -45,39 +59,26 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
// }
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
* The number of times to spin (with randomly interspersed calls
* to Thread.yield) on multiprocessor before blocking when a node
* is apparently the first waiter in the queue. See above for
* explanation. Must be a power of two. The value is empirically
* derived -- it works pretty well across a variety of processors,
* numbers of CPUs, and OSes.
*/
private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = Pow2.roundToPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
private static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
private static final Object WAITER = new Object();
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
private static final int FRONT_SPINS = 1 << 7;
/**
* The number of times to spin before blocking when a node is
* preceded by another node that is apparently spinning. Also
* serves as an increment to FRONT_SPINS on phase changes, and as
* base average frequency for yielding during spins. Must be a
* power of two.
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
@ -95,145 +96,357 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElementType(elementOffset, TYPE_FREE);
soElement(elementOffset, new Node());
}
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
/**
* PRODUCER
*/
public void put(Object item) {
final Thread thread = Thread.currentThread();
public Object xfer(final Object item, final boolean timed, final long nanos, final int incomingType) throws InterruptedException {
// empty or same mode = push+park onto queue
// complimentary mode = unpark+pop off queue
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
// long currentConsumerIndex;
long currentProducerIndex;
long currentProducerIndex = -1; // start with bogus value, hope we don't need it
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
long currentConsumerIndex = Long.MAX_VALUE; // start with bogus value, hope we don't need it
long cSeqOffset;
int prevElementType;
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
// the sequence must be loaded before we get the previous node type, otherwise previousNodeType can be stale/
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
final long pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// if the queue is EMPTY, this will return TYPE_FREE
// other consumers may have grabbed the element, or queue might be empty
// may be FREE or CONSUMER/PRODUCER
final long prevElementOffset = calcElementOffset(currentProducerIndex - 1, mask);
prevElementType = lpElementType(prevElementOffset);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
Object lpElement = lpElementNoCast(offset);
spItem1(lpElement, item);
if (prevElementType == TYPE_CANCELED) {
// pop off and continue.
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
return;
final long nextConsumerIndex = currentConsumerIndex + 1;
final long delta = seq - nextConsumerIndex;
if (delta == 0) {
if (prevElementType != lpElementType(prevElementOffset)) {
// inconsistent read of type.
continue;
}
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
spElementType(offset, TYPE_FREE);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// spin
busySpin(prevElementType, incomingType);
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return null;
// another consumer beat us and moved sequence ahead, retry 2
// only producer busyspins
continue;
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
busySpin();
// it is possible that two threads check the queue at the exact same time,
// BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
if (prevElementType == TYPE_FREE || prevElementType == incomingType) {
// empty or same mode = push+park onto queue
if (timed && nanos <= 0) {
// can't wait
return null;
}
final long delta = pSeq - currentProducerIndex;
if (delta == 0) {
if (prevElementType != lpElementType(prevElementOffset)) {
// inconsistent read of type.
continue;
}
// this is expected if we see this first time around
final long nextProducerIndex = currentProducerIndex + 1;
if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
if (prevElementType == TYPE_FREE && currentProducerIndex != currentConsumerIndex) {
// inconsistent read
// try to undo, if possible.
if (!casProducerIndex(nextProducerIndex, currentProducerIndex)) {
spElementType(offset, TYPE_CANCELED);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
}
continue;
}
spElementType(offset, incomingType);
spElementThread(offset, Thread.currentThread());
if (incomingType == TYPE_CONSUMER) {
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
park(offset, timed, nanos);
Object lpElement = lpElementNoCast(offset);
Object lpItem1 = lpItem1(lpElement);
return lpItem1;
} else {
// producer
Object lpElement = lpElementNoCast(offset);
spItem1(lpElement, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
park(offset, timed, nanos);
return null;
}
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
// we spin if the queue is full
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin
busySpin(prevElementType, incomingType);
}
else {
// complimentary mode = unpark+pop off queue
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long nextConsumerIndex = currentConsumerIndex + 1;
final long delta = seq - nextConsumerIndex;
if (delta == 0) {
if (prevElementType != lpElementType(prevElementOffset)) {
// inconsistent read of type.
continue;
}
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
if (prevElementType != lpElementType(prevElementOffset)) {
System.err.println("WAHT");
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
continue;
}
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
spElementType(offset, TYPE_FREE);
final Object thread = lpElementThread(offset);
final Object lpElement = lpElementNoCast(offset);
if (incomingType == TYPE_CONSUMER) {
// is already cancelled/fulfilled
if (thread == null ||
!casElementThread(offset, thread, null)) { // FULL barrier
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
continue;
}
Object returnItem = lpItem1(lpElement);
spItem1(lpElement, null);
UNSAFE.unpark(thread);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return returnItem;
} else {
// producer
spItem1(lpElement, item);
// is already cancelled/fulfilled
if (thread == null ||
!casElementThread(offset, thread, null)) { // FULL barrier
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
continue;
}
UNSAFE.unpark(thread);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return null;
}
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// spin
busySpin(prevElementType, incomingType);
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer busyspins
}
}
}
/**
* CONSUMER
*/
public Object take() {
final Thread thread = Thread.currentThread();
private static final void busySpin(int previousNodeType, int currentNodeType) {
ThreadLocalRandom randomYields = null; // bound if needed
randomYields = ThreadLocalRandom.current();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
// long currentProducerIndex;
long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentProducerIndex = lvProducerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Object e = lpElementNoCast(offset);
Object item = lpItem1(e);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore
return item;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// contention.
busySpin();
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer busyspins
}
}
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
int spins = spinsFor(previousNodeType, currentNodeType);
// int spins = 512;
for (;;) {
if (spins > 0) {
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0) {
LockSupport.parkNanos(1); // occasionally yield
}
} else {
break;
}
}
}
/**
* Returns spin/yield value for a node with given predecessor and
* data mode. See above for explanation.
*/
private final static int spinsFor(int previousNodeType, int currentNodeType) {
// if (MP && pred != null) {
// if (previousNodeType != currentNodeType) {
// // in the process of changing modes
// return FRONT_SPINS + CHAINED_SPINS;
// }
// if (pred.isMatched()) {
// at the front of the queue
return FRONT_SPINS;
// }
// if (pred.waiter == null) {
// // previous is spinning
// return CHAINED_SPINS;
// }
// }
//
// return 0;
}
private final void park(long offset, final boolean timed, final long nanos) throws InterruptedException {
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = 2000;
Thread myThread = Thread.currentThread();
// if (timed) {
// long now = System.nanoTime();
// nanos -= now - lastTime;
// lastTime = now;
// if (nanos <= 0) {
//// s.tryCancel(e);
// continue;
// }
// }
// busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary)
for (;;) {
if (lpElementThread(offset) == null) {
return;
} else if (spins > 0) {
--spins;
// } else if (spins > negMaxUntimedSpins) {
// --spins;
// LockSupport.parkNanos(1);
} else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
if (myThread.isInterrupted()) {
casElementThread(offset, myThread, null);
Thread.interrupted();
throw new InterruptedException();
}
}
}
}
@Override
public boolean offer(Node message) {
return false;

View File

@ -13,6 +13,7 @@ abstract class ColdItems extends PrePad {
// private static AtomicInteger count = new AtomicInteger();
// public final int ID = count.getAndIncrement();
// public boolean isConsumer = false;
// public short type = MessageType.ONE;
public Object item1 = null;
// public Object item2 = null;
@ -26,7 +27,6 @@ abstract class Pad0 extends ColdItems {
}
abstract class HotItem1 extends Pad0 {
// public transient volatile boolean isConsumer = false;
}
abstract class Pad1 extends HotItem1 {

View File

@ -1,9 +1,10 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public final class SimpleQueue extends LinkedArrayList {
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
public final class SimpleQueue {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -40,18 +41,17 @@ public final class SimpleQueue extends LinkedArrayList {
*/
static final long spinForTimeoutThreshold = 1000L;
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
private MpmcExchangerQueue queue;
public SimpleQueue(final int size) {
super(size);
this.queue = new MpmcExchangerQueue(Pow2.roundToPowerOfTwo(size));
}
/**
* PRODUCER
*/
public void put(Object item) throws InterruptedException {
xfer(item, false, 0);
this.queue.xfer(item, false, 0, MpmcExchangerQueue.TYPE_PRODUCER);
}
@ -59,136 +59,138 @@ public final class SimpleQueue extends LinkedArrayList {
* CONSUMER
*/
public Object take() throws InterruptedException {
return xfer(null, false, 0);
// this.queue.xfer(123, false, 0, MpmcExchangerQueue.TYPE_PRODUCER);
// return 123;
return this.queue.xfer(null, false, 0, MpmcExchangerQueue.TYPE_CONSUMER);
}
private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException {
final boolean isConsumer= item == null;
while (true) {
// empty or same mode = push+park onto queue
// complimentary mode = unpark+pop off queue
final Object tail = lvTail(); // LoadLoad
final Object head = lvHead(); // LoadLoad
Object thread;
private Object xfer(Object item, boolean timed, long nanos, byte incomingType) throws InterruptedException {
return null;
// while (true) {
// empty or same mode = push+park onto queue
// complimentary mode = unpark+pop off queue
//
// Object thread;
//
// it is possible that two threads check the queue at the exact same time,
// BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
boolean empty = head == lpNext(tail);
boolean sameMode = lpType(tail) == isConsumer;
// check to make sure we are not "double dipping" on our head
thread = lpThread(head);
if (empty && thread != null) {
empty = false;
} else if (sameMode && thread == null) {
busySpin();
continue;
}
// empty or same mode = push+park onto queue
if (empty || sameMode) {
if (timed && nanos <= 0) {
// can't wait
return null;
}
final Object tNext = lpNext(tail);
if (tail != lvTail()) { // LoadLoad
// inconsistent read
busySpin();
continue;
}
if (isConsumer) {
spType(tNext, isConsumer);
spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in
// busySpin();
continue;
}
park(tNext, timed, nanos);
// this will only advance head if necessary
// final boolean empty = this.queue.isEmpty(); // LoadLoad
// boolean sameMode;
// if (!empty) {
// sameMode = this.queue.getLastMessageType() == isConsumer;
// }
//
// // check to make sure we are not "double dipping" on our head
//// thread = lpThread(head);
//// if (empty && thread != null) {
//// empty = false;
//// } else if (sameMode && thread == null) {
//// busySpin();
//// continue;
//// }
//
// // empty or same mode = push+park onto queue
// if (empty || sameMode) {
// if (timed && nanos <= 0) {
// // can't wait
// return null;
// }
//
// final Object tNext = lpNext(tail);
// if (tail != lvTail()) { // LoadLoad
// // inconsistent read
// busySpin();
// continue;
// }
//
// if (isConsumer) {
// spType(tNext, isConsumer);
// spThread(tNext, Thread.currentThread());
//
// if (!advanceTail(tail, tNext)) { // FULL barrier
// // failed to link in
//// busySpin();
// continue;
// }
//
// park(tNext, timed, nanos);
//
// // this will only advance head if necessary
//// advanceHead(tail, tNext);
// Object lpItem1 = lpItem1(tNext);
// spItem1(tNext, null);
// return lpItem1;
// } else {
// // producer
// spType(tNext, isConsumer);
// spItem1(tNext, item);
// spThread(tNext, Thread.currentThread());
//
// if (!advanceTail(tail, tNext)) { // FULL barrier
// // failed to link in
// continue;
// }
//
// park(tNext, timed, nanos);
//
// // this will only advance head if necessary
// advanceHead(tail, tNext);
Object lpItem1 = lpItem1(tNext);
spItem1(tNext, null);
return lpItem1;
} else {
// producer
spType(tNext, isConsumer);
spItem1(tNext, item);
spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in
continue;
}
park(tNext, timed, nanos);
// this will only advance head if necessary
advanceHead(tail, tNext);
return null;
}
}
// complimentary mode = unpark+pop off queue
else {
Object next = lpNext(head);
if (tail != lvTail() || head != lvHead()) { // LoadLoad
// inconsistent read
busySpin();
continue;
}
thread = lpThread(head);
if (isConsumer) {
Object returnVal = lpItem1(head);
// is already cancelled/fulfilled
if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier
// head was already used by a different thread
advanceHead(head, next); // FULL barrier
continue;
}
spItem1(head, null);
LockSupport.unpark((Thread) thread);
advanceHead(head, next); // FULL barrier
return returnVal;
} else {
// producer
spItem1(head, item); // StoreStore
// is already cancelled/fulfilled
if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier
// head was already used by a different thread
advanceHead(head, next); // FULL barrier
continue;
}
LockSupport.unpark((Thread) thread);
advanceHead(head, next); // FULL barrier
return null;
}
}
}
// return null;
// }
// }
// // complimentary mode = unpark+pop off queue
// else {
// Object next = lpNext(head);
//
// if (tail != lvTail() || head != lvHead()) { // LoadLoad
// // inconsistent read
// busySpin();
// continue;
// }
//
// thread = lpThread(head);
// if (isConsumer) {
// Object returnVal = lpItem1(head);
//
// // is already cancelled/fulfilled
// if (thread == null ||
// !casThread(head, thread, null)) { // FULL barrier
//
// // head was already used by a different thread
// advanceHead(head, next); // FULL barrier
//
// continue;
// }
//
// spItem1(head, null);
// LockSupport.unpark((Thread) thread);
//
// advanceHead(head, next); // FULL barrier
//
// return returnVal;
// } else {
// // producer
// spItem1(head, item); // StoreStore
//
// // is already cancelled/fulfilled
// if (thread == null ||
// !casThread(head, thread, null)) { // FULL barrier
//
// // head was already used by a different thread
// advanceHead(head, next); // FULL barrier
//
// continue;
// }
//
// LockSupport.unpark((Thread) thread);
//
// advanceHead(head, next); // FULL barrier
// return null;
// }
// }
// }
}
private static final void busySpin2() {
@ -215,44 +217,44 @@ public final class SimpleQueue extends LinkedArrayList {
}
}
private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException {
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = maxUntimedSpins;
Thread myThread = Thread.currentThread();
// if (timed) {
// long now = System.nanoTime();
// nanos -= now - lastTime;
// lastTime = now;
// if (nanos <= 0) {
//// s.tryCancel(e);
// continue;
// }
// }
// busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary)
for (;;) {
if (lpThread(myNode) == null) {
return;
// } else if (spins > 0) {
// private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException {
//// long lastTime = timed ? System.nanoTime() : 0;
//// int spins = timed ? maxTimedSpins : maxUntimedSpins;
// int spins = maxUntimedSpins;
// Thread myThread = Thread.currentThread();
//
//// if (timed) {
//// long now = System.nanoTime();
//// nanos -= now - lastTime;
//// lastTime = now;
//// if (nanos <= 0) {
////// s.tryCancel(e);
//// continue;
//// }
//// }
//
// // busy spin for the amount of time (roughly) of a CPU context switch
// // then park (if necessary)
// for (;;) {
// if (lpThread(myNode) == null) {
// return;
//// } else if (spins > 0) {
//// --spins;
// } else if (spins > negMaxUntimedSpins) {
// --spins;
} else if (spins > negMaxUntimedSpins) {
--spins;
// LockSupport.parkNanos(1);
} else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
if (myThread.isInterrupted()) {
casThread(myNode, myThread, null);
Thread.interrupted();
throw new InterruptedException();
}
}
}
}
//// LockSupport.parkNanos(1);
// } else {
// // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
// LockSupport.park();
//
// if (myThread.isInterrupted()) {
// casThread(myNode, myThread, null);
// Thread.interrupted();
// throw new InterruptedException();
// }
// }
// }
// }
// @Override
// public boolean isEmpty() {

View File

@ -45,6 +45,7 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
@ -99,18 +100,7 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param e a kitty
*/
protected final void spElement(long offset, E e) {
spElement(this.buffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElement(E[] buffer, long offset, E e) {
UNSAFE.putObject(buffer, offset, e);
UNSAFE.putObject(this.buffer, offset, e);
}
/**
@ -119,40 +109,29 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElementType(long offset, Boolean e) {
spElementType(this.typeBuffer, offset, e);
protected final void spElementType(long offset, int e) {
UNSAFE.putInt(this.typeBuffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
* An ordered store(store + StoreStore barrier) of an element TYPE to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElementType(Boolean[] buffer, long offset, Boolean e) {
UNSAFE.putObject(buffer, offset, e);
protected final void soElementType(long offset, int e) {
UNSAFE.putOrderedInt(this.typeBuffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
* A plain store (no ordering/fences) of an element THREAD to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElementThread(long offset, Thread e) {
spElementThread(this.threadBuffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElementThread(Thread[] buffer, long offset, Thread e) {
UNSAFE.putObject(buffer, offset, e);
UNSAFE.putObject(this.threadBuffer, offset, e);
}
/**
@ -162,18 +141,7 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
soElement(this.buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(E[] buffer, long offset, E e) {
UNSAFE.putOrderedObject(buffer, offset, e);
UNSAFE.putOrderedObject(this.buffer, offset, e);
}
/**
@ -182,35 +150,19 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lpElement(long offset) {
return lpElement(this.buffer, offset);
}
protected final Object lpElementNoCast(long offset) {
return lpElementNoCast(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lpElement(E[] buffer, long offset) {
return (E) UNSAFE.getObject(buffer, offset);
protected final E lpElement(long offset) {
return (E) UNSAFE.getObject(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Object lpElementNoCast(E[] buffer, long offset) {
return UNSAFE.getObject(buffer, offset);
protected final Object lpElementNoCast(long offset) {
return UNSAFE.getObject(this.buffer, offset);
}
/**
@ -219,12 +171,18 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Boolean lpElementType(long offset) {
return lpElementType(this.typeBuffer, offset);
protected final int lpElementType(long offset) {
return UNSAFE.getInt(this.typeBuffer, offset);
}
protected final Boolean lpElementType(Boolean[] buffer, long offset) {
return (Boolean) UNSAFE.getObject(buffer, offset);
/**
* A volatile load (load + LoadLoad barrier) of an element TYPE from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final int lvElementType(long offset) {
return UNSAFE.getIntVolatile(this.typeBuffer, offset);
}
/**
@ -233,12 +191,8 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lpElementThread(long offset) {
return lpElementThread(this.threadBuffer, offset);
}
protected final Thread lpElementThread(Thread[] buffer, long offset) {
return (Thread) UNSAFE.getObject(buffer, offset);
protected final Object lpElementThread(long offset) {
return UNSAFE.getObject(this.threadBuffer, offset);
}
/**
@ -247,20 +201,9 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lvElement(long offset) {
return lvElement(this.buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lvElement(E[] buffer, long offset) {
return (E) UNSAFE.getObjectVolatile(buffer, offset);
protected final E lvElement(long offset) {
return (E) UNSAFE.getObjectVolatile(this.buffer, offset);
}
/**
@ -269,26 +212,17 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lvElementThread(long offset) {
return lvElementThread(this.threadBuffer, offset);
protected final Object lvElementThread(long offset) {
return UNSAFE.getObjectVolatile(this.threadBuffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lvElementThread(Thread[] buffer, long offset) {
return (Thread) UNSAFE.getObjectVolatile(buffer, offset);
}
protected final boolean casElementThread(long offset, Thread expect, Thread newValue) {
protected final boolean casElementThread(long offset, Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(this.threadBuffer, offset, expect, newValue);
}
protected final boolean casElementType(long offset, int expect, int newValue) {
return UNSAFE.compareAndSwapInt(this.typeBuffer, offset, expect, newValue);
}
@Override
public Iterator<E> iterator() {

View File

@ -18,12 +18,12 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
@ -33,7 +33,7 @@ public class MpmcQueueAltPerfTest {
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY);
final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -48,16 +48,20 @@ public class MpmcQueueAltPerfTest {
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception {
private static long performanceRun(int runNumber, MpmcArrayQueue<Node> queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
MpmcExchangerQueue consumer = queue;
Object result;
MpmcArrayQueue<Node> consumer = queue;
Node result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
result = consumer.take();
while (null == (result = consumer.poll())) {
queueEmpty++;
Thread.yield();
}
} while (0 != --i);
long end = System.nanoTime();
@ -65,27 +69,45 @@ public class MpmcQueueAltPerfTest {
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result);
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result.item1, queueEmpty, p.queueFull);
return ops;
}
@SuppressWarnings("rawtypes")
public static class Producer implements Runnable {
private final MpmcExchangerQueue queue;
private final MpmcArrayQueue queue;
int queueFull = 0;
long start;
public Producer(MpmcExchangerQueue queue) {
public Producer(MpmcArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcExchangerQueue producer = this.queue;
MpmcArrayQueue producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
MpmcArrayQueue<Node> pool = new MpmcArrayQueue<Node>(2);
pool.offer(new Node());
pool.offer(new Node());
Node node;
do {
producer.put(TEST_VALUE);
node = pool.poll();
node.item1 = TEST_VALUE;
while (!producer.offer(node)) {
Thread.yield();
f++;
}
pool.offer(node);
} while (0 != --i);
this.queueFull = f;
this.start = s;
}
}