WIP LinkedArrayList queue

This commit is contained in:
nathan 2015-04-21 15:44:15 +02:00
parent eacca670de
commit 2c87177b31
3 changed files with 325 additions and 261 deletions

View File

@ -0,0 +1,137 @@
package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
public class LinkedArrayList {
private static final long HEAD;
private static final long TAIL;
private static final long NEXT;
private static final long IS_CONSUMER;
private static final long IS_READY;
private static final long THREAD;
private static final long ITEM1;
static {
try {
HEAD = UNSAFE.objectFieldOffset(LinkedArrayList.class.getField("head"));
TAIL = UNSAFE.objectFieldOffset(LinkedArrayList.class.getField("tail"));
NEXT = UNSAFE.objectFieldOffset(Node.class.getField("next"));
IS_CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
IS_READY = UNSAFE.objectFieldOffset(Node.class.getField("isReady"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1, item);
}
static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1, item);
}
static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1);
}
static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1);
}
final Object lvHead() {
return UNSAFE.getObjectVolatile(this, HEAD);
}
final Object lpHead() {
return UNSAFE.getObject(this, HEAD);
}
final Object lvTail() {
return UNSAFE.getObjectVolatile(this, TAIL);
}
final Object lpTail() {
return UNSAFE.getObject(this, TAIL);
}
final Object lpNext(Object node) {
return UNSAFE.getObject(node, NEXT);
}
final boolean advanceTail(Object expected, Object newTail) {
if (expected == lvTail()) {
return UNSAFE.compareAndSwapObject(this, TAIL, expected, newTail);
}
return false;
}
final boolean advanceHead(Object expected, Object newHead) {
if (expected == lvHead()) {
return UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead);
}
return false;
}
final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD);
}
final Object lpThread(Object node) {
return UNSAFE.getObject(node, THREAD);
}
final void spThread(Object node, Thread thread) {
UNSAFE.putObject(node, THREAD, thread);
}
final boolean casThread(Object node, Object expected, Object newThread) {
return UNSAFE.compareAndSwapObject(node, THREAD, expected, newThread);
}
final boolean lpType(Object node) {
return UNSAFE.getBoolean(node, IS_CONSUMER);
}
final void spType(Object node, boolean isConsumer) {
UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer);
}
final boolean lpIsReady(Object node) {
return UNSAFE.getBoolean(node, IS_READY);
}
final void spIsReady(Object node, boolean isReady) {
UNSAFE.putBoolean(node, IS_READY, isReady);
}
public Node head;
public Node tail;
private Node[] buffer;
public LinkedArrayList(int size) {
this.buffer = new Node[size];
// pre-fill our data structures. This just makes sure to have happy memory. we use linkedList style iteration
for (int i=0; i < size; i++) {
this.buffer[i] = new Node();
if (i > 0) {
this.buffer[i-1].next = this.buffer[i];
}
}
this.buffer[size-1].next = this.buffer[0];
this.tail = this.buffer[0];
this.head = this.tail.next;
}
}

View File

@ -1,13 +1,20 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicInteger;
// mpmc sparse.shift = 2, for this to be fast.
abstract class PrePad {
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class ColdItems {
private static AtomicInteger count = new AtomicInteger();
public final int ID = count.getAndIncrement();
public boolean isReady = false;
public boolean isConsumer = false;
// public short type = MessageType.ONE;
public Object item1 = null;
// public Object item2 = null;
@ -16,17 +23,30 @@ abstract class ColdItems {
}
abstract class Pad0 extends ColdItems {
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem1 extends ColdItems {
// public volatile Thread thread;
abstract class HotItem1 extends Pad0 {
public volatile Thread thread;
}
public class Node extends HotItem1 {
abstract class Pad1 extends HotItem1 {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem2 extends Pad1 {
public volatile Node next;
}
public class Node extends HotItem2 {
// post-padding
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() {
}
@Override
public String toString() {
return "[" + this.ID + "]";
}
}

View File

@ -1,40 +1,9 @@
package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
private static final long ITEM1_OFFSET;
static {
try {
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item);
}
private static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1_OFFSET, item);
}
private static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET);
}
private static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1_OFFSET);
}
public final class SimpleQueue extends LinkedArrayList {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -76,18 +45,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
public SimpleQueue(final int size) {
super(size);
// pre-fill our data structures
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
long currentProducerIndex;
for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
soElement(elementOffset, new Node());
}
}
/**
@ -106,190 +63,164 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException {
final Boolean isConsumer = Boolean.valueOf(item == null);
final boolean consumerBoolValue = isConsumer.booleanValue();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
// values we shouldn't reach
long currentConsumerIndex = -1;
long currentProducerIndex = Long.MAX_VALUE;
long cSeqOffset;
long pSeqOffset;
long pSeq;
long pDelta = -1;
boolean sameMode = false;
boolean empty = false;
final boolean isConsumer= item == null;
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
// empty or same mode = push+park onto queue
// complimentary mode = unpark+pop off queue
// empty or same mode
// push+park onto queue
Object tail = lvTail(); // LoadLoad
Object head = lvHead(); // LoadLoad
Object thread;
// we add ourselves to the queue and check status (maybe we park OR we undo, pop-consumer, and unpark)
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
pDelta = pSeq - currentProducerIndex;
if (pDelta == 0) {
// this is expected if we see this first time around
final long nextProducerIndex = currentProducerIndex + 1;
if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
// Successful CAS: full barrier
// it is possible that two threads check the queue at the exact same time,
// BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
boolean empty = head == lpNext(tail);
boolean sameMode = lpType(tail) == isConsumer;
// empty or same mode = push+park onto queue
if (empty || sameMode) {
if (timed && nanos <= 0) {
// can't wait
return null;
}
// it is possible that two threads check the queue at the exact same time,
// BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
currentConsumerIndex = lvConsumerIndex();
empty = currentProducerIndex == currentConsumerIndex;
final Object tNext = lpNext(tail);
if (tail != lvTail()) { // LoadLoad
// inconsistent read
busySpin();
continue;
}
if (!empty) {
final long previousProducerIndex = currentProducerIndex - 1;
final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask);
final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad
final long ppDelta = ppSeq - previousProducerIndex;
if (ppDelta == 1) {
// same mode check
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(previousProducerIndex, mask);
sameMode = lpElementType(offset) == isConsumer;
}
thread = lpThread(head);
if (thread == null) {
if (sameMode) {
busySpin();
continue;
}
if (empty || sameMode) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
spElementType(offset, isConsumer);
spElementThread(offset, Thread.currentThread());
final Object element = lpElement(offset);
if (consumerBoolValue) {
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait
park(element, offset, timed, nanos);
return lvItem1(element);
} else {
spItem1(element, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait
park(element, offset, timed, nanos);
return null;
}
} else {
// complimentary mode
// undo my push, since we will be poping off the queue instead
casProducerIndex(nextProducerIndex, currentProducerIndex);
while (true) {
// get item
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long nextConsumerIndex = currentConsumerIndex + 1;
final long cDelta = cSeq - nextConsumerIndex;
if (cDelta == 0) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Thread thread = lpElementThread(offset);
if (consumerBoolValue) {
if (thread == null || // is cancelled/fulfilled already
!casElementThread(offset, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
continue;
}
// success
final Object element = lpElement(offset);
Object item1 = lpItem1(element);
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return item1;
}
busySpin();
// failed CAS
continue;
} else {
final Object element = lpElement(offset);
soItem1(element, item);
if (thread == null || // is cancelled/fulfilled already
!casElementThread(offset, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
}
// lost CAS
busySpin();
continue;
}
// success
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
LockSupport.unpark(thread);
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return null;
}
// lost CAS
busySpin();
continue;
}
}
}
} else {
if (empty) {
busySpin();
continue;
}
}
// failed cas, retry 1
} else if (pDelta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return;
}
// contention.
busySpin();
// if (sameMode && !lpIsReady(tNext)) {
// // A "node" is only ready (and valid for a "isConsumer check") once the "isReady" has been set.
// continue;
// } else if (empty && lpIsReady(tNext)) {
// // A "node" is only empty (and valid for a "isEmpty check") if the head node "isReady" has not been set (otherwise, head is still in progress)
// continue;
// }
if (isConsumer) {
spType(tNext, isConsumer);
spIsReady(tNext, true);
spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in
busySpin();
continue;
}
park(tNext, timed, nanos);
// this will only advance head if necessary
advanceHead(tail, tNext);
return lvItem1(tNext);
} else {
spType(tNext, isConsumer);
spItem1(tNext, item);
spIsReady(tNext, true);
spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in
busySpin();
continue;
}
park(tNext, timed, nanos);
// this will only advance head if necessary
advanceHead(tail, tNext);
return null;
}
}
// complimentary mode = unpark+pop off queue
else {
Object next = lpNext(head);
if (tail != lvTail() || head != lvHead()) { // LoadLoad
// inconsistent read
continue;
}
thread = lpThread(head);
if (isConsumer) {
Object returnVal;
while (true) {
returnVal = lpItem1(head);
// is already cancelled/fulfilled
if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier
// move head forward to look for next "ready" node
if (advanceHead(head, next)) { // FULL barrier
head = next;
next = lpNext(head);
}
thread = lpThread(head);
busySpin();
continue;
}
break;
}
spIsReady(head, false);
LockSupport.unpark((Thread) thread);
advanceHead(head, next);
return returnVal;
} else {
while (true) {
soItem1(head, item); // StoreStore
// is already cancelled/fulfilled
if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier
// move head forward to look for next "ready" node
if (advanceHead(head, next)) { // FULL barrier
head = next;
next = lpNext(head);
}
thread = lpThread(head);
busySpin();
continue;
}
break;
}
spIsReady(head, false);
LockSupport.unpark((Thread) thread);
advanceHead(head, next);
return null;
}
}
}
}
@ -305,12 +236,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
}
/**
* @param myThread
* @return
* @return false if we were interrupted, true if we were unparked by another thread
*/
private final boolean park(Object myNode, long myOffset, boolean timed, long nanos) throws InterruptedException {
private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException {
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = maxUntimedSpins;
@ -329,8 +255,8 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
// busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary)
for (;;) {
if (lvElementThread(myOffset) == null) {
return true;
if (lvThread(myNode) == null) {
return;
} else if (spins > 0) {
--spins;
} else if (spins > negMaxUntimedSpins) {
@ -341,41 +267,22 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
LockSupport.park();
if (myThread.isInterrupted()) {
casElementThread(myOffset, myThread, null);
return false;
casThread(myNode, myThread, null);
Thread.interrupted();
throw new InterruptedException();
}
}
}
}
@Override
public boolean offer(Node message) {
return false;
}
@Override
public Node poll() {
return null;
}
@Override
public Node peek() {
return null;
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
// @Override
// public boolean isEmpty() {
// // Order matters!
// // Loading consumer before producer allows for producer increments after consumer index is read.
// // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// // nothing we can do to make this an exact method.
// return lvConsumerIndex() == lvProducerIndex();
// }
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured