diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java new file mode 100644 index 0000000..ccf222d --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java @@ -0,0 +1,137 @@ +package dorkbox.util.messagebus.common.simpleq; + +import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; + +public class LinkedArrayList { + + private static final long HEAD; + private static final long TAIL; + + private static final long NEXT; + private static final long IS_CONSUMER; + private static final long IS_READY; + private static final long THREAD; + + private static final long ITEM1; + + static { + try { + HEAD = UNSAFE.objectFieldOffset(LinkedArrayList.class.getField("head")); + TAIL = UNSAFE.objectFieldOffset(LinkedArrayList.class.getField("tail")); + + NEXT = UNSAFE.objectFieldOffset(Node.class.getField("next")); + IS_CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); + IS_READY = UNSAFE.objectFieldOffset(Node.class.getField("isReady")); + THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); + ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + static final void soItem1(Object node, Object item) { + UNSAFE.putOrderedObject(node, ITEM1, item); + } + + static final void spItem1(Object node, Object item) { + UNSAFE.putObject(node, ITEM1, item); + } + + static final Object lvItem1(Object node) { + return UNSAFE.getObjectVolatile(node, ITEM1); + } + + static final Object lpItem1(Object node) { + return UNSAFE.getObject(node, ITEM1); + } + + final Object lvHead() { + return UNSAFE.getObjectVolatile(this, HEAD); + } + + final Object lpHead() { + return UNSAFE.getObject(this, HEAD); + } + + final Object lvTail() { + return UNSAFE.getObjectVolatile(this, TAIL); + } + + final Object lpTail() { + return UNSAFE.getObject(this, TAIL); + } + + final Object lpNext(Object node) { + return UNSAFE.getObject(node, NEXT); + } + + final boolean advanceTail(Object expected, Object newTail) { + if (expected == lvTail()) { + return UNSAFE.compareAndSwapObject(this, TAIL, expected, newTail); + } + return false; + } + + final boolean advanceHead(Object expected, Object newHead) { + if (expected == lvHead()) { + return UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead); + } + return false; + } + + final Object lvThread(Object node) { + return UNSAFE.getObjectVolatile(node, THREAD); + } + + final Object lpThread(Object node) { + return UNSAFE.getObject(node, THREAD); + } + + final void spThread(Object node, Thread thread) { + UNSAFE.putObject(node, THREAD, thread); + } + + final boolean casThread(Object node, Object expected, Object newThread) { + return UNSAFE.compareAndSwapObject(node, THREAD, expected, newThread); + } + + final boolean lpType(Object node) { + return UNSAFE.getBoolean(node, IS_CONSUMER); + } + + final void spType(Object node, boolean isConsumer) { + UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer); + } + + final boolean lpIsReady(Object node) { + return UNSAFE.getBoolean(node, IS_READY); + } + + final void spIsReady(Object node, boolean isReady) { + UNSAFE.putBoolean(node, IS_READY, isReady); + } + + + + public Node head; + public Node tail; + + private Node[] buffer; + + public LinkedArrayList(int size) { + this.buffer = new Node[size]; + + // pre-fill our data structures. This just makes sure to have happy memory. we use linkedList style iteration + for (int i=0; i < size; i++) { + this.buffer[i] = new Node(); + if (i > 0) { + this.buffer[i-1].next = this.buffer[i]; + } + } + + this.buffer[size-1].next = this.buffer[0]; + + this.tail = this.buffer[0]; + this.head = this.tail.next; + } +} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 8c3f8a5..0441214 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,13 +1,20 @@ package dorkbox.util.messagebus.common.simpleq; +import java.util.concurrent.atomic.AtomicInteger; + // mpmc sparse.shift = 2, for this to be fast. abstract class PrePad { -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class ColdItems { + private static AtomicInteger count = new AtomicInteger(); + public final int ID = count.getAndIncrement(); + + public boolean isReady = false; + public boolean isConsumer = false; // public short type = MessageType.ONE; public Object item1 = null; // public Object item2 = null; @@ -16,17 +23,30 @@ abstract class ColdItems { } abstract class Pad0 extends ColdItems { -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; } -abstract class HotItem1 extends ColdItems { -// public volatile Thread thread; +abstract class HotItem1 extends Pad0 { + public volatile Thread thread; } -public class Node extends HotItem1 { +abstract class Pad1 extends HotItem1 { + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class HotItem2 extends Pad1 { + public volatile Node next; +} + +public class Node extends HotItem2 { // post-padding -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { } + + @Override + public String toString() { + return "[" + this.ID + "]"; + } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index c5751ac..41bee02 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -1,40 +1,9 @@ package dorkbox.util.messagebus.common.simpleq; -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; - -public final class SimpleQueue extends MpmcArrayQueueConsumerField { - - private static final long ITEM1_OFFSET; - - static { - try { - ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - private static final void soItem1(Object node, Object item) { - UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item); - } - - private static final void spItem1(Object node, Object item) { - UNSAFE.putObject(node, ITEM1_OFFSET, item); - } - - private static final Object lvItem1(Object node) { - return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET); - } - - private static final Object lpItem1(Object node) { - return UNSAFE.getObject(node, ITEM1_OFFSET); - } - +public final class SimpleQueue extends LinkedArrayList { /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -76,18 +45,6 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { public SimpleQueue(final int size) { super(size); - - // pre-fill our data structures - - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - long currentProducerIndex; - - for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) { - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long elementOffset = calcElementOffset(currentProducerIndex, mask); - soElement(elementOffset, new Node()); - } } /** @@ -106,190 +63,164 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException { - - final Boolean isConsumer = Boolean.valueOf(item == null); - final boolean consumerBoolValue = isConsumer.booleanValue(); - - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long capacity = this.mask + 1; - final long[] sBuffer = this.sequenceBuffer; - - // values we shouldn't reach - long currentConsumerIndex = -1; - long currentProducerIndex = Long.MAX_VALUE; - - long cSeqOffset; - long pSeqOffset; - - long pSeq; - long pDelta = -1; - - boolean sameMode = false; - boolean empty = false; + final boolean isConsumer= item == null; while (true) { - currentProducerIndex = lvProducerIndex(); // LoadLoad + // empty or same mode = push+park onto queue + // complimentary mode = unpark+pop off queue - // empty or same mode - // push+park onto queue + Object tail = lvTail(); // LoadLoad + Object head = lvHead(); // LoadLoad + Object thread; - // we add ourselves to the queue and check status (maybe we park OR we undo, pop-consumer, and unpark) - pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); - pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - pDelta = pSeq - currentProducerIndex; - if (pDelta == 0) { - // this is expected if we see this first time around - final long nextProducerIndex = currentProducerIndex + 1; - if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { - // Successful CAS: full barrier + // it is possible that two threads check the queue at the exact same time, + // BOTH can think that the queue is empty, resulting in a deadlock between threads + // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when + // in reality, it is. + boolean empty = head == lpNext(tail); + boolean sameMode = lpType(tail) == isConsumer; + // empty or same mode = push+park onto queue + if (empty || sameMode) { + if (timed && nanos <= 0) { + // can't wait + return null; + } - // it is possible that two threads check the queue at the exact same time, - // BOTH can think that the queue is empty, resulting in a deadlock between threads - // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when - // in reality, it is. - currentConsumerIndex = lvConsumerIndex(); - empty = currentProducerIndex == currentConsumerIndex; + final Object tNext = lpNext(tail); + if (tail != lvTail()) { // LoadLoad + // inconsistent read + busySpin(); + continue; + } - if (!empty) { - final long previousProducerIndex = currentProducerIndex - 1; - - final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask); - final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad - final long ppDelta = ppSeq - previousProducerIndex; - - if (ppDelta == 1) { - // same mode check - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(previousProducerIndex, mask); - sameMode = lpElementType(offset) == isConsumer; - } + thread = lpThread(head); + if (thread == null) { + if (sameMode) { + busySpin(); + continue; } - - if (empty || sameMode) { - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentProducerIndex, mask); - spElementType(offset, isConsumer); - spElementThread(offset, Thread.currentThread()); - - final Object element = lpElement(offset); - if (consumerBoolValue) { - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore - - // now we wait - park(element, offset, timed, nanos); - return lvItem1(element); - } else { - spItem1(element, item); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore - - // now we wait - park(element, offset, timed, nanos); - return null; - } - } else { - // complimentary mode - - // undo my push, since we will be poping off the queue instead - casProducerIndex(nextProducerIndex, currentProducerIndex); - - while (true) { - // get item - cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long nextConsumerIndex = currentConsumerIndex + 1; - final long cDelta = cSeq - nextConsumerIndex; - - if (cDelta == 0) { - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); - final Thread thread = lpElementThread(offset); - - if (consumerBoolValue) { - if (thread == null || // is cancelled/fulfilled already - !casElementThread(offset, thread, null)) { // failed cas state - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - } - - continue; - } - - // success - final Object element = lpElement(offset); - Object item1 = lpItem1(element); - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - LockSupport.unpark(thread); - - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - return item1; - } - - busySpin(); - // failed CAS - continue; - } else { - final Object element = lpElement(offset); - soItem1(element, item); - - if (thread == null || // is cancelled/fulfilled already - !casElementThread(offset, thread, null)) { // failed cas state - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - } - - // lost CAS - busySpin(); - continue; - } - - // success - - // pop off queue - if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { - // Successful CAS: full barrier - - LockSupport.unpark(thread); - soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore - - return null; - } - - // lost CAS - busySpin(); - continue; - } - } - - } + } else { + if (empty) { + busySpin(); + continue; } } - // failed cas, retry 1 - } else if (pDelta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex - currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - // return; - } - // contention. - busySpin(); +// if (sameMode && !lpIsReady(tNext)) { +// // A "node" is only ready (and valid for a "isConsumer check") once the "isReady" has been set. +// continue; +// } else if (empty && lpIsReady(tNext)) { +// // A "node" is only empty (and valid for a "isEmpty check") if the head node "isReady" has not been set (otherwise, head is still in progress) +// continue; +// } + + + if (isConsumer) { + spType(tNext, isConsumer); + spIsReady(tNext, true); + + spThread(tNext, Thread.currentThread()); + + if (!advanceTail(tail, tNext)) { // FULL barrier + // failed to link in + busySpin(); + continue; + } + + park(tNext, timed, nanos); + + // this will only advance head if necessary + advanceHead(tail, tNext); + return lvItem1(tNext); + } else { + spType(tNext, isConsumer); + spItem1(tNext, item); + spIsReady(tNext, true); + + spThread(tNext, Thread.currentThread()); + + if (!advanceTail(tail, tNext)) { // FULL barrier + // failed to link in + busySpin(); + continue; + } + + park(tNext, timed, nanos); + + // this will only advance head if necessary + advanceHead(tail, tNext); + return null; + } + } + // complimentary mode = unpark+pop off queue + else { + Object next = lpNext(head); + + if (tail != lvTail() || head != lvHead()) { // LoadLoad + // inconsistent read + continue; + } + + thread = lpThread(head); + if (isConsumer) { + Object returnVal; + + while (true) { + returnVal = lpItem1(head); + + // is already cancelled/fulfilled + if (thread == null || + !casThread(head, thread, null)) { // FULL barrier + + // move head forward to look for next "ready" node + if (advanceHead(head, next)) { // FULL barrier + head = next; + next = lpNext(head); + } + + thread = lpThread(head); + busySpin(); + continue; + } + + break; + } + + spIsReady(head, false); + LockSupport.unpark((Thread) thread); + + advanceHead(head, next); + return returnVal; + } else { + while (true) { + soItem1(head, item); // StoreStore + + // is already cancelled/fulfilled + if (thread == null || + !casThread(head, thread, null)) { // FULL barrier + + // move head forward to look for next "ready" node + if (advanceHead(head, next)) { // FULL barrier + head = next; + next = lpNext(head); + } + + thread = lpThread(head); + busySpin(); + continue; + } + + break; + } + + spIsReady(head, false); + LockSupport.unpark((Thread) thread); + + advanceHead(head, next); + return null; + } + } } } @@ -305,12 +236,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } } - /** - * @param myThread - * @return - * @return false if we were interrupted, true if we were unparked by another thread - */ - private final boolean park(Object myNode, long myOffset, boolean timed, long nanos) throws InterruptedException { + private final void park(Object myNode, boolean timed, long nanos) throws InterruptedException { // long lastTime = timed ? System.nanoTime() : 0; // int spins = timed ? maxTimedSpins : maxUntimedSpins; int spins = maxUntimedSpins; @@ -329,8 +255,8 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { // busy spin for the amount of time (roughly) of a CPU context switch // then park (if necessary) for (;;) { - if (lvElementThread(myOffset) == null) { - return true; + if (lvThread(myNode) == null) { + return; } else if (spins > 0) { --spins; } else if (spins > negMaxUntimedSpins) { @@ -341,41 +267,22 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { LockSupport.park(); if (myThread.isInterrupted()) { - casElementThread(myOffset, myThread, null); - return false; + casThread(myNode, myThread, null); + Thread.interrupted(); + throw new InterruptedException(); } } } } - @Override - public boolean offer(Node message) { - return false; - } - - @Override - public Node poll() { - return null; - } - - @Override - public Node peek() { - return null; - } - - @Override - public int size() { - return 0; - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() == lvProducerIndex(); - } +// @Override +// public boolean isEmpty() { +// // Order matters! +// // Loading consumer before producer allows for producer increments after consumer index is read. +// // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is +// // nothing we can do to make this an exact method. +// return lvConsumerIndex() == lvProducerIndex(); +// } public boolean hasPendingMessages() { // count the number of consumers waiting, it should be the same as the number of threads configured