WIP, code polish. discovered rare test hang

This commit is contained in:
nathan 2015-05-08 17:32:19 +02:00
parent b967733a63
commit 7fa59650a8
4 changed files with 214 additions and 96 deletions

View File

@ -17,7 +17,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue; import java.util.concurrent.TransferQueue;
import org.jctools.queues.MpmcArrayQueue; import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.Pow2;
import org.jctools.util.UnsafeAccess; import org.jctools.util.UnsafeAccess;
@ -29,9 +28,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
/** Is it multi-processor? */ /** Is it multi-processor? */
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
private static int INPROGRESS_SPINS = MP ? 32 : 0; private static final int INPROGRESS_SPINS = MP ? 32 : 0;
private static int PUSH_SPINS = MP ? 512 : 0; private static final int PUSH_SPINS = MP ? 32 : 0;
private static int POP_SPINS = MP ? 512 : 0; private static final int POP_SPINS = MP ? 512 : 0;
/** /**
@ -41,14 +40,14 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
* seems not to vary with number of CPUs (beyond 2) so is just * seems not to vary with number of CPUs (beyond 2) so is just
* a constant. * a constant.
*/ */
private static int PARK_TIMED_SPINS = MP ? 32 : 0; private static final int PARK_TIMED_SPINS = MP ? 32 : 0;
/** /**
* The number of times to spin before blocking in untimed waits. * The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin * This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin. * faster since they don't need to check times on each spin.
*/ */
private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; private static final int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16;
/** /**
* The number of nanoseconds for which it is faster to spin * The number of nanoseconds for which it is faster to spin
@ -59,11 +58,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
private final int consumerCount; private final int consumerCount;
public MpmcTransferArrayQueue(final int consumerCount) { public MpmcTransferArrayQueue(final int consumerCount) {
this(consumerCount, (int) Math.pow(Runtime.getRuntime().availableProcessors(),2)); super(1024); // must be power of 2
}
public MpmcTransferArrayQueue(final int consumerCount, final int queueSize) {
super(Pow2.roundToPowerOfTwo(queueSize));
this.consumerCount = consumerCount; this.consumerCount = consumerCount;
} }
@ -95,6 +90,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
public final Object take() { public final Object take() {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long consumerIndex; long consumerIndex;
@ -105,12 +101,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY; lastType = TYPE_EMPTY;
previousElement = null;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -120,11 +114,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
lastType = lpType(previousElement); lastType = lpType(previousElement);
} }
switch (lastType) { if (lastType != TYPE_PRODUCER) {
case TYPE_EMPTY:
case TYPE_CONSUMER: {
// if (lastType != TYPE_PRODUCER) {
// TYPE_EMPTY, TYPE_CONSUMER // TYPE_EMPTY, TYPE_CONSUMER
// empty or same mode = push+park onto queue // empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask); long pSeqOffset = calcSequenceOffset(producerIndex, mask);
@ -133,7 +123,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (delta == 0) { if (delta == 0) {
// this is expected if we see this first time around // this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) { final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
final Thread myThread = Thread.currentThread(); final Thread myThread = Thread.currentThread();
@ -145,12 +136,12 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask); final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node); spElement(buffer, offset, node);
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
park(node, myThread, false, 0L); park(node, myThread, false, 0L);
Object item1 = lvItem1(node); Object item1 = lvItem1(node);
@ -161,28 +152,26 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// whoops, inconsistent state // whoops, inconsistent state
busySpin(PUSH_SPINS); busySpin(PUSH_SPINS);
continue; } else {
}
// else {
case TYPE_PRODUCER: {
// TYPE_PRODUCER // TYPE_PRODUCER
// complimentary mode = pop+unpark off queue // complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask); long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1); final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) { if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask); final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset); final Object e = lpElement(buffer, offset);
spElement(offset, null); spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer // Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2) // (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
final Object lvItem1 = lpItem1(e); final Object lvItem1 = lpItem1(e);
unpark(e); unpark(e);
@ -193,8 +182,111 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// whoops, inconsistent state // whoops, inconsistent state
busySpin(POP_SPINS); busySpin(POP_SPINS);
continue;
} }
}
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary
*/
public final Object take(final Node node) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
if (lastType != TYPE_PRODUCER) {
// TYPE_EMPTY, TYPE_CONSUMER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
// final Object node = nodeThreadLocal.get();
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(buffer, offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
park(node, myThread, false, 0L);
Object item1 = lvItem1(node);
return item1;
}
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
} else {
// TYPE_PRODUCER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) {
if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(buffer, offset);
spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
final Object lvItem1 = lpItem1(e);
unpark(e);
return lvItem1;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
} }
} }
} }
@ -206,6 +298,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final long capacity = mask + 1; final long capacity = mask + 1;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long producerIndex; long producerIndex;
@ -221,17 +314,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (delta == 0) { if (delta == 0) {
// this is expected if we see this first time around // this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) { final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask); final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item); spElement(buffer, offset, item);
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return true; return true;
} }
@ -259,6 +353,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final long capacity = mask + 1; final long capacity = mask + 1;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long producerIndex; long producerIndex;
@ -274,17 +369,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (delta == 0) { if (delta == 0) {
// this is expected if we see this first time around // this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) { final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask); final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item); spElement(buffer, offset, item);
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return true; return true;
} }
@ -320,6 +416,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final long capacity = mask + 1; final long capacity = mask + 1;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long producerIndex; long producerIndex;
@ -335,17 +432,18 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (delta == 0) { if (delta == 0) {
// this is expected if we see this first time around // this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) { final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask); final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item); spElement(buffer, offset, item);
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return; return;
} }
@ -370,6 +468,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
public Object poll() { public Object poll() {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long consumerIndex; long consumerIndex;
@ -380,20 +479,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); // LoadLoad consumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(consumerIndex, mask); cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1); final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) { if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask); final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset); final Object e = lpElement(buffer, offset);
spElement(offset, null); spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer // Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2) // (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
return e; return e;
} }
@ -421,12 +521,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
@Override @Override
public Object peek() { public Object peek() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long currConsumerIndex; long currConsumerIndex;
Object e; Object e;
do { do {
currConsumerIndex = lvConsumerIndex(); currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty // other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex)); e = lpElement(buffer, calcElementOffset(currConsumerIndex, mask));
// only return null if queue is empty // only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex()); } while (e == null && currConsumerIndex != lvProducerIndex());
return e; return e;
@ -455,6 +559,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
public boolean tryTransfer(Object item) { public boolean tryTransfer(Object item) {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long consumerIndex; long consumerIndex;
@ -465,12 +570,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY; lastType = TYPE_EMPTY;
previousElement = null;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -499,20 +602,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// complimentary mode = pop+unpark off queue // complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask); long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1); final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) { if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask); final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset); final Object e = lpElement(buffer, offset);
spElement(offset, null); spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer // Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2) // (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
soItem1(e, item); soItem1(e, item);
unpark(e); unpark(e);
@ -540,6 +644,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long consumerIndex; long consumerIndex;
@ -550,20 +655,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); // LoadLoad consumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(consumerIndex, mask); cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1); final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) { if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask); final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset); final Object e = lpElement(buffer, offset);
spElement(offset, null); spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer // Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2) // (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
return e; return e;
} }
@ -645,6 +751,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
@Override @Override
public boolean hasWaitingConsumer() { public boolean hasWaitingConsumer() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex; long consumerIndex;
long producerIndex; long producerIndex;
@ -652,11 +762,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
return false; return false;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -670,6 +779,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
@Override @Override
public int getWaitingConsumerCount() { public int getWaitingConsumerCount() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex; long consumerIndex;
long producerIndex; long producerIndex;
@ -677,11 +790,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
return 0; return 0;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -699,6 +811,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
public final boolean hasPendingMessages() { public final boolean hasPendingMessages() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex; long consumerIndex;
long producerIndex; long producerIndex;
@ -706,11 +822,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
return true; return true;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -754,12 +869,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
spins = PARK_UNTIMED_SPINS; spins = PARK_UNTIMED_SPINS;
} }
if (spins > 0) { randomYields = ThreadLocalRandom.current();
randomYields = ThreadLocalRandom.current();
}
} else if (spins > 0) { } else if (spins > 0) {
if (randomYields.nextInt(256) == 0) { if (randomYields.nextInt(1024) == 0) {
Thread.yield(); // occasionally yield UnsafeAccess.UNSAFE.park(false, 1L);
} }
--spins; --spins;
} else if (timed) { } else if (timed) {
@ -768,7 +881,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
lastTime = now; lastTime = now;
if (remaining > 0) { if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) { if (remaining < SPIN_THRESHOLD) {
// a park is too slow for this number, so just busy spin instead // a park is too slow for this time, so just busy spin instead
busySpin(PARK_UNTIMED_SPINS); busySpin(PARK_UNTIMED_SPINS);
} else { } else {
UnsafeAccess.UNSAFE.park(false, nanos); UnsafeAccess.UNSAFE.park(false, nanos);
@ -796,6 +909,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
private final boolean producerXfer(final Object item, final boolean timed, final long nanos) { private final boolean producerXfer(final Object item, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
long consumerIndex; long consumerIndex;
@ -806,12 +920,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
consumerIndex = lvConsumerIndex(); consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex(); producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY; lastType = TYPE_EMPTY;
previousElement = null;
} else { } else {
previousElement = lpElement(calcElementOffset(producerIndex-1)); final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -830,7 +942,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (delta == 0) { if (delta == 0) {
// this is expected if we see this first time around // this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) { final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
final Thread myThread = Thread.currentThread(); final Thread myThread = Thread.currentThread();
@ -843,12 +956,12 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask); final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node); spElement(buffer, offset, node);
// increment sequence by 1, the value expected by consumer // increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return park(node, myThread, timed, nanos); return park(node, myThread, timed, nanos);
} }
@ -861,20 +974,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// complimentary mode = pop+unpark off queue // complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask); long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1); final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) { if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier // Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask); final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset); final Object e = lpElement(buffer, offset);
spElement(offset, null); spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer // Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2) // (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
soItem1(e, item); soItem1(e, item);
unpark(e); unpark(e);

View File

@ -2,16 +2,14 @@ package dorkbox.util.messagebus.common.simpleq.jctools;
import org.jctools.util.UnsafeAccess; import org.jctools.util.UnsafeAccess;
import dorkbox.util.messagebus.common.simpleq.MessageType;
abstract class ColdItems { abstract class ColdItems {
public int type = 0; public int type = 0;
public int messageType = MessageType.ONE; // public int messageType = MessageType.ONE;
public Object item1 = null; public Object item1 = null;
public Object item2 = null; // public Object item2 = null;
public Object item3 = null; // public Object item3 = null;
public Object[] item4 = null; // public Object[] item4 = null;
} }
abstract class Pad0 extends ColdItems { abstract class Pad0 extends ColdItems {

View File

@ -7,26 +7,31 @@ import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node; import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class PerfTest_MpmcTransferArrayQueue { public class PerfTest_MpmcTransferArrayQueue {
// static {
// System.setProperty("sparse.shift", "2");
// }
public static final int REPETITIONS = 50 * 1000 * 100; public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17; public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 4; private static final int concurrency = 2;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails()); System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 2; final int warmupRuns = 4;
final int runs = 5; final int runs = 5;
long average = 0; long average = 0;
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
average = averageRun(warmupRuns, runs, queue); average = averageRun(warmupRuns, runs, queue);
// SimpleQueue.INPROGRESS_SPINS = 64; // SimpleQueue.INPROGRESS_SPINS = 64;
@ -151,8 +156,9 @@ public class PerfTest_MpmcTransferArrayQueue {
Object result = null; Object result = null;
int i = REPETITIONS; int i = REPETITIONS;
Node node = new Node();
do { do {
result = consumer.take(); result = consumer.take(node);
} while (0 != --i); } while (0 != --i);
this.result = result; this.result = result;

View File

@ -11,12 +11,12 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
private static final int concurrency = 1; private static final int concurrency = 1;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final int warmupRuns = 2; final int warmupRuns = 5;
final int runs = 5; final int runs = 5;
long average = 0; long average = 0;
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(QUEUE_CAPACITY); final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency, QUEUE_CAPACITY);
average = averageRun(warmupRuns, runs, queue); average = averageRun(warmupRuns, runs, queue);
// SimpleQueue.INPROGRESS_SPINS = 64; // SimpleQueue.INPROGRESS_SPINS = 64;