working, but slow. Using MPMC array queue

This commit is contained in:
nathan 2015-04-20 21:25:59 +02:00
parent 42230706c7
commit eacca670de
5 changed files with 246 additions and 253 deletions

View File

@ -1,6 +1,5 @@
package dorkbox.util.messagebus.common.simpleq; package dorkbox.util.messagebus.common.simpleq;
import com.lmax.disruptor.MessageType;
// mpmc sparse.shift = 2, for this to be fast. // mpmc sparse.shift = 2, for this to be fast.
@ -9,12 +8,11 @@ abstract class PrePad {
} }
abstract class ColdItems { abstract class ColdItems {
public short type = MessageType.ONE; // public short type = MessageType.ONE;
public boolean isConsumer = false;
public Object item1 = null; public Object item1 = null;
public Object item2 = null; // public Object item2 = null;
public Object item3 = null; // public Object item3 = null;
public Object[] item4 = null; // public Object[] item4 = null;
} }
abstract class Pad0 extends ColdItems { abstract class Pad0 extends ColdItems {
@ -22,7 +20,7 @@ abstract class Pad0 extends ColdItems {
} }
abstract class HotItem1 extends ColdItems { abstract class HotItem1 extends ColdItems {
public volatile Thread thread; // public volatile Thread thread;
} }
public class Node extends HotItem1 { public class Node extends HotItem1 {

View File

@ -9,43 +9,16 @@ import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerFiel
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> { public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
private final static long THREAD;
private static final long ITEM1_OFFSET; private static final long ITEM1_OFFSET;
private static final long TYPE;
private static final long CONSUMER;
static { static {
try { try {
CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
} catch (NoSuchFieldException e) { } catch (NoSuchFieldException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private static final void spIsConsumer(Object node, boolean value) {
UNSAFE.putBoolean(node, CONSUMER, value);
}
private static final boolean lpIsConsumer(Object node) {
return UNSAFE.getBoolean(node, CONSUMER);
}
private static final boolean lvIsConsumer(Object node) {
return UNSAFE.getBooleanVolatile(node, CONSUMER);
}
private static final void spType(Object node, short type) {
UNSAFE.putShort(node, TYPE, type);
}
private static final short lpType(Object node) {
return UNSAFE.getShort(node, TYPE);
}
private static final void soItem1(Object node, Object item) { private static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item); UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item);
} }
@ -62,37 +35,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
return UNSAFE.getObject(node, ITEM1_OFFSET); return UNSAFE.getObject(node, ITEM1_OFFSET);
} }
private static final Thread lvThread(Object node) {
return (Thread) UNSAFE.getObjectVolatile(node, THREAD);
}
private static final Thread lpThread(Object node) {
return (Thread) UNSAFE.getObject(node, THREAD);
}
private static final Object cancelledMarker = new Object();
private static final boolean lpIsCanceled(Object node) {
return cancelledMarker == UNSAFE.getObject(node, THREAD);
}
private static final void spIsCancelled(Object node) {
UNSAFE.putObject(node, THREAD, cancelledMarker);
}
private static final void soThread(Object node, Thread newValue) {
UNSAFE.putOrderedObject(node, THREAD, newValue);
}
private static final void spThread(Object node, Thread newValue) {
UNSAFE.putObject(node, THREAD, newValue);
}
private static final boolean casThread(Object node, Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue);
}
/** The number of CPUs */ /** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -104,7 +46,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems. * The value here is approximately the average of those across a range of tested systems.
*/ */
private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000
/** /**
* The number of times to spin before blocking in timed waits. * The number of times to spin before blocking in timed waits.
@ -114,7 +56,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
* a constant. * a constant.
*/ */
static final int maxTimedSpins = NCPU < 2 ? 0 : 32; static final int maxTimedSpins = NCPU < 2 ? 0 : 32;
static final int negMaxTimedSpins = -maxTimedSpins;
/** /**
* The number of times to spin before blocking in untimed waits. * The number of times to spin before blocking in untimed waits.
@ -122,6 +63,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
* faster since they don't need to check times on each spin. * faster since they don't need to check times on each spin.
*/ */
static final int maxUntimedSpins = maxTimedSpins * 16; static final int maxUntimedSpins = maxTimedSpins * 16;
static final int negMaxUntimedSpins = -maxUntimedSpins;
/** /**
* The number of nanoseconds for which it is faster to spin * The number of nanoseconds for which it is faster to spin
@ -129,8 +71,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
*/ */
static final long spinForTimeoutThreshold = 1000L; static final long spinForTimeoutThreshold = 1000L;
long p40, p41, p42, p43, p44, p45, p46; long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37; long p30, p31, p32, p33, p34, p35, p36, p37;
@ -167,15 +107,17 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException { private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException {
boolean isConsumer = item == null; final Boolean isConsumer = Boolean.valueOf(item == null);
final boolean consumerBoolValue = isConsumer.booleanValue();
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final long capacity = this.mask + 1; final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex; // values we shouldn't reach
long currentProducerIndex; long currentConsumerIndex = -1;
long currentProducerIndex = Long.MAX_VALUE;
long cSeqOffset; long cSeqOffset;
long pSeqOffset; long pSeqOffset;
@ -187,90 +129,59 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
boolean empty = false; boolean empty = false;
while (true) { while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad
// empty or same mode // empty or same mode
// check what was last placed on the queue // push+park onto queue
if (currentProducerIndex == currentConsumerIndex) {
empty = true;
} else {
final long previousProducerIndex = currentProducerIndex - 1;
final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask); // we add ourselves to the queue and check status (maybe we park OR we undo, pop-consumer, and unpark)
final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long ppDelta = ppSeq - previousProducerIndex; pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
pDelta = pSeq - currentProducerIndex;
if (ppDelta == 1) { if (pDelta == 0) {
// same mode check // this is expected if we see this first time around
final long nextProducerIndex = currentProducerIndex + 1;
// on 64bit(no compressed oops) JVM this is the same as seqOffset if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
final long offset = calcElementOffset(previousProducerIndex, mask); // Successful CAS: full barrier
Object element = lpElement(offset);
sameMode = lpIsConsumer(element) == isConsumer;
} else if (ppDelta < 1 && // slot has not been moved by producer
currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// is empty
empty = true;
} else {
// hasn't been moved yet. retry 2
busySpin();
continue;
}
}
if (empty || sameMode) { // it is possible that two threads check the queue at the exact same time,
// push+park onto queue // BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
currentConsumerIndex = lvConsumerIndex();
empty = currentProducerIndex == currentConsumerIndex;
// we add ourselves to the queue and wait if (!empty) {
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); final long previousProducerIndex = currentProducerIndex - 1;
pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
pDelta = pSeq - currentProducerIndex;
if (pDelta == 0) {
// this is expected if we see this first time around
final long nextProducerIndex = currentProducerIndex + 1;
if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
// Successful CAS: full barrier
final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask);
final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad
final long ppDelta = ppSeq - previousProducerIndex;
// it is possible that two threads check the queue at the exact same time, if (ppDelta == 1) {
// BOTH can think that the queue is empty, resulting in a deadlock between threads // same mode check
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
currentConsumerIndex = lvConsumerIndex();
if (empty && currentProducerIndex != currentConsumerIndex) { // on 64bit(no compressed oops) JVM this is the same as seqOffset
// RESET the push of this element. final long offset = calcElementOffset(previousProducerIndex, mask);
empty = false; sameMode = lpElementType(offset) == isConsumer;
casProducerIndex(nextProducerIndex, currentProducerIndex);
continue;
} else if (sameMode && currentProducerIndex == currentConsumerIndex) {
sameMode = false;
casProducerIndex(nextProducerIndex, currentProducerIndex);
continue;
} }
}
if (empty || sameMode) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask); final long offset = calcElementOffset(currentProducerIndex, mask);
final Object element = lpElement(offset); spElementType(offset, isConsumer);
spIsConsumer(element, isConsumer); spElementThread(offset, Thread.currentThread());
spThread(element, Thread.currentThread());
if (isConsumer) { final Object element = lpElement(offset);
if (consumerBoolValue) {
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait // now we wait
park(element, timed, nanos); park(element, offset, timed, nanos);
return lvItem1(element); return lvItem1(element);
} else { } else {
spItem1(element, item); spItem1(element, item);
@ -280,99 +191,103 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait // now we wait
park(element, timed, nanos); park(element, offset, timed, nanos);
return null; return null;
} }
}
// failed cas, retry 1
} else if (pDelta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return;
busySpin();
}
} else {
// complimentary mode
// get item
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long nextConsumerIndex = currentConsumerIndex + 1;
final long cDelta = cSeq - nextConsumerIndex;
if (cDelta == 0) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Object element = lpElement(offset);
final Thread thread = lpThread(element);
if (isConsumer) {
if (thread == null || // is cancelled/fulfilled already
!casThread(element, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
continue;
}
// success
Object item1 = lpItem1(element);
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return item1;
}
continue;
} else { } else {
soItem1(element, item); // complimentary mode
if (thread == null || // is cancelled/fulfilled already // undo my push, since we will be poping off the queue instead
!casThread(element, thread, null)) { // failed cas state casProducerIndex(nextProducerIndex, currentProducerIndex);
// pop off queue while (true) {
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { // get item
// Successful CAS: full barrier cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long nextConsumerIndex = currentConsumerIndex + 1;
final long cDelta = cSeq - nextConsumerIndex;
if (cDelta == 0) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Thread thread = lpElementThread(offset);
if (consumerBoolValue) {
if (thread == null || // is cancelled/fulfilled already
!casElementThread(offset, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
continue;
}
// success
final Object element = lpElement(offset);
Object item1 = lpItem1(element);
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return item1;
}
busySpin();
// failed CAS
continue;
} else {
final Object element = lpElement(offset);
soItem1(element, item);
if (thread == null || // is cancelled/fulfilled already
!casElementThread(offset, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
// lost CAS
busySpin();
continue;
}
// success
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return null;
}
// lost CAS
busySpin();
continue;
}
} }
continue;
} }
// success
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return null;
}
// lost CAS
busySpin();
continue;
} }
} else if (cDelta < 0 && // slot has not been moved by producer
currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
busySpin();
} }
// failed cas, retry 1
} else if (pDelta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return;
} }
// contention. // contention.
busySpin(); busySpin();
} }
@ -380,7 +295,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
private static final void busySpin() { private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch // busy spin for the amount of time (roughly) of a CPU context switch
int spins = maxUntimedSpins; int spins = SPINS;
for (;;) { for (;;) {
if (spins > 0) { if (spins > 0) {
--spins; --spins;
@ -395,7 +310,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
* @return * @return
* @return false if we were interrupted, true if we were unparked by another thread * @return false if we were interrupted, true if we were unparked by another thread
*/ */
private static final boolean park(Object myNode, boolean timed, long nanos) throws InterruptedException { private final boolean park(Object myNode, long myOffset, boolean timed, long nanos) throws InterruptedException {
// long lastTime = timed ? System.nanoTime() : 0; // long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins; // int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = maxUntimedSpins; int spins = maxUntimedSpins;
@ -413,51 +328,26 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
// busy spin for the amount of time (roughly) of a CPU context switch // busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary) // then park (if necessary)
int spin = spins;
for (;;) { for (;;) {
if (lvThread(myNode) == null) { if (lvElementThread(myOffset) == null) {
return true; return true;
} else if (spin > 0) { } else if (spins > 0) {
--spin; --spins;
} else if (spin > negMaxTimedSpins) { } else if (spins > negMaxUntimedSpins) {
--spins;
LockSupport.parkNanos(1); LockSupport.parkNanos(1);
} else { } else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park(); LockSupport.park();
if (myThread.isInterrupted()) { if (myThread.isInterrupted()) {
casThread(myNode, myThread, null); casElementThread(myOffset, myThread, null);
return false; return false;
} }
} }
} }
} }
// /**
// * Unparks the other node (if it was waiting)
// */
// private static final void unpark(Object otherNode) {
// Thread myThread = Thread.currentThread();
// Thread thread;
//
// for (;;) {
// thread = getThread(otherNode);
// if (threadCAS(otherNode, thread, myThread)) {
// if (thread == null) {
// // no parking (UNPARK won the race)
// return;
// } else if (thread != myThread) {
// // park will always set the waiter back to null
// LockSupport.unpark(thread);
// return;
// } else {
// // contention
// busySpin();
// }
// }
// }
// }
@Override @Override
public boolean offer(Node message) { public boolean offer(Node message) {
return false; return false;

View File

@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> imp
* @param <E> * @param <E>
*/ */
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> { public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
protected static final int BUFFER_PAD; protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE; private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT; private static final int REF_ELEMENT_SHIFT;
@ -63,6 +63,8 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
protected final long mask; protected final long mask;
// @Stable :( // @Stable :(
protected final E[] buffer; protected final E[] buffer;
protected final Boolean[] typeBuffer;
protected final Thread[] threadBuffer;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) { public ConcurrentCircularArrayQueue(int capacity) {
@ -70,6 +72,8 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
this.mask = actualCapacity - 1; this.mask = actualCapacity - 1;
// pad data on either end with some empty slots. // pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
this.typeBuffer = new Boolean[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
this.threadBuffer = new Thread[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
} }
/** /**
@ -109,6 +113,48 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
UNSAFE.putObject(buffer, offset, e); UNSAFE.putObject(buffer, offset, e);
} }
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElementType(long offset, Boolean e) {
spElementType(this.typeBuffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElementType(Boolean[] buffer, long offset, Boolean e) {
UNSAFE.putObject(buffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElementThread(long offset, Thread e) {
spElementThread(this.threadBuffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element TYPE to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElementThread(Thread[] buffer, long offset, Thread e) {
UNSAFE.putObject(buffer, offset, e);
}
/** /**
* An ordered store(store + StoreStore barrier) of an element to a given offset * An ordered store(store + StoreStore barrier) of an element to a given offset
* *
@ -152,6 +198,34 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
return (E) UNSAFE.getObject(buffer, offset); return (E) UNSAFE.getObject(buffer, offset);
} }
/**
* A plain load (no ordering/fences) of an element TYPE from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Boolean lpElementType(long offset) {
return lpElementType(this.typeBuffer, offset);
}
protected final Boolean lpElementType(Boolean[] buffer, long offset) {
return (Boolean) UNSAFE.getObject(buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element THREAD from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lpElementThread(long offset) {
return lpElementThread(this.threadBuffer, offset);
}
protected final Thread lpElementThread(Thread[] buffer, long offset) {
return (Thread) UNSAFE.getObject(buffer, offset);
}
/** /**
* A volatile load (load + LoadLoad barrier) of an element from a given offset. * A volatile load (load + LoadLoad barrier) of an element from a given offset.
* *
@ -174,6 +248,33 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
return (E) UNSAFE.getObjectVolatile(buffer, offset); return (E) UNSAFE.getObjectVolatile(buffer, offset);
} }
/**
* A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lvElementThread(long offset) {
return lvElementThread(this.threadBuffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element THREAD from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Thread lvElementThread(Thread[] buffer, long offset) {
return (Thread) UNSAFE.getObjectVolatile(buffer, offset);
}
protected final boolean casElementThread(long offset, Thread expect, Thread newValue) {
return UNSAFE.compareAndSwapObject(this.threadBuffer, offset, expect, newValue);
}
@Override @Override
public Iterator<E> iterator() { public Iterator<E> iterator() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -21,6 +21,10 @@ public abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad
return this.consumerIndex; return this.consumerIndex;
} }
protected final long lpConsumerIndex() {
return UNSAFE.getLong(this, C_INDEX_OFFSET);
}
protected final boolean casConsumerIndex(long expect, long newValue) { protected final boolean casConsumerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
} }

View File

@ -23,7 +23,7 @@ import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
public class SimpleQueueAltPerfTest { public class SimpleQueueAltPerfTest {
// 15 == 32 * 1024 // 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);