diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java index 0182a51..9bd18b5 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java @@ -65,6 +65,7 @@ public class LinkedArrayList extends PadA1 { final Object lvHead() { return UNSAFE.getObjectVolatile(this, HEAD); +// return this.head; } final Object lpHead() { @@ -76,48 +77,51 @@ public class LinkedArrayList extends PadA1 { } final Object lpTail() { - return UNSAFE.getObject(this, TAIL); + return lpTail(this); } - final Object lpNext(Object node) { + static final Object lpTail(Object source) { + return UNSAFE.getObject(source, TAIL); + } + + static final Object lpNext(Object node) { return UNSAFE.getObject(node, NEXT); } final boolean advanceTail(Object expected, Object newTail) { -// if (expected == lvTail()) { - return UNSAFE.compareAndSwapObject(this, TAIL, expected, newTail); -// } -// return false; + final Object that = this; + return advanceTail(that, lpTail(that), expected, newTail); + } + + static final boolean advanceTail(Object source, Object objectActualTail, Object expected, Object newTail) { + return expected == objectActualTail && UNSAFE.compareAndSwapObject(source, TAIL, expected, newTail); } final boolean advanceHead(Object expected, Object newHead) { -// if (expected == lvHead()) { - return UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead); -// } -// return false; + return expected == lpHead() && UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead); } - final Object lvThread(Object node) { + static final Object lvThread(Object node) { return UNSAFE.getObjectVolatile(node, THREAD); } - final Object lpThread(Object node) { + static final Object lpThread(Object node) { return UNSAFE.getObject(node, THREAD); } - final void spThread(Object node, Thread thread) { + static final void spThread(Object node, Thread thread) { UNSAFE.putObject(node, THREAD, thread); } - final boolean casThread(Object node, Object expected, Object newThread) { + static final boolean casThread(Object node, Object expected, Object newThread) { return UNSAFE.compareAndSwapObject(node, THREAD, expected, newThread); } - final boolean lpType(Object node) { + static final boolean lpType(Object node) { return UNSAFE.getBoolean(node, IS_CONSUMER); } - final void spType(Object node, boolean isConsumer) { + static final void spType(Object node, boolean isConsumer) { UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer); } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index f1cfb29..97150d6 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -5,7 +5,7 @@ package dorkbox.util.messagebus.common.simpleq; // mpmc sparse.shift = 2, for this to be fast. abstract class PrePad { - volatile long y0, y1, y2, y4, y5, y6 = 7L; +// volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } @@ -14,42 +14,42 @@ abstract class ColdItems { // public final int ID = count.getAndIncrement(); // public short type = MessageType.ONE; - public volatile Object item1 = null; + public transient volatile Object item1 = null; // public Object item2 = null; // public Object item3 = null; // public Object[] item4 = null; } abstract class Pad0 extends ColdItems { - volatile long y0, y1, y2, y4, y5, y6 = 7L; +// volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem1 extends Pad0 { - public volatile boolean isConsumer = false; + public transient volatile boolean isConsumer = false; } abstract class Pad1 extends HotItem1 { - volatile long y0, y1, y2, y4, y5, y6 = 7L; +// volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem2 extends Pad1 { - public volatile Thread thread; + public transient volatile Thread thread; } abstract class Pad2 extends HotItem2 { - volatile long y0, y1, y2, y4, y5, y6 = 7L; +// volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem3 extends Pad2 { - public volatile Node next; + public transient volatile Node next; } public class Node extends HotItem3 { // post-padding - volatile long y0, y1, y2, y4, y5, y6 = 7L; +// volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index 84f5c06..a0adb08 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -31,7 +31,7 @@ public final class SimpleQueue extends LinkedArrayList { * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. */ - static final int maxUntimedSpins = maxTimedSpins * 64; + static final int maxUntimedSpins = maxTimedSpins * 32; static final int negMaxUntimedSpins = -maxUntimedSpins; /** @@ -77,8 +77,17 @@ public final class SimpleQueue extends LinkedArrayList { // BOTH can think that the queue is empty, resulting in a deadlock between threads // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when // in reality, it is. - final boolean empty = head == lpNext(tail); - final boolean sameMode = lpType(tail) == isConsumer; + boolean empty = head == lpNext(tail); + boolean sameMode = lpType(tail) == isConsumer; + + // check to make sure we are not "double dipping" on our head + thread = lpThread(head); + if (empty && thread != null) { + empty = false; + } else if (sameMode && thread == null) { + busySpin(); + continue; + } // empty or same mode = push+park onto queue if (empty || sameMode) { @@ -94,26 +103,13 @@ public final class SimpleQueue extends LinkedArrayList { continue; } - thread = lpThread(head); - if (thread != null) { - if (empty) { - busySpin(); - continue; - } - } else { - if (sameMode) { - busySpin(); - continue; - } - } - if (isConsumer) { spType(tNext, isConsumer); spThread(tNext, Thread.currentThread()); if (!advanceTail(tail, tNext)) { // FULL barrier // failed to link in - busySpin(); +// busySpin(); continue; } @@ -121,23 +117,24 @@ public final class SimpleQueue extends LinkedArrayList { // this will only advance head if necessary // advanceHead(tail, tNext); - return lpItem1(tNext); + Object lpItem1 = lpItem1(tNext); + spItem1(tNext, null); + return lpItem1; } else { + // producer spType(tNext, isConsumer); spItem1(tNext, item); - spThread(tNext, Thread.currentThread()); if (!advanceTail(tail, tNext)) { // FULL barrier // failed to link in - busySpin(); continue; } park(tNext, timed, nanos); // this will only advance head if necessary -// advanceHead(tail, tNext); + advanceHead(tail, tNext); return null; } } @@ -153,55 +150,37 @@ public final class SimpleQueue extends LinkedArrayList { thread = lpThread(head); if (isConsumer) { - Object returnVal; -// while (true) { - returnVal = lpItem1(head); + Object returnVal = lpItem1(head); - // is already cancelled/fulfilled - if (thread == null || - !casThread(head, thread, null)) { // FULL barrier + // is already cancelled/fulfilled + if (thread == null || + !casThread(head, thread, null)) { // FULL barrier - // move head forward to look for next "ready" node - if (!advanceHead(head, next)) { // FULL barrier - busySpin(); -// head = next; -// next = lpNext(head); - } + // head was already used by a different thread + advanceHead(head, next); // FULL barrier -// thread = lpThread(head); - - continue; - } - -// break; -// } + continue; + } + spItem1(head, null); LockSupport.unpark((Thread) thread); - advanceHead(head, next); // FULL barrier + advanceHead(head, next); // FULL barrier + return returnVal; } else { -// while (true) { - spItem1(head, item); // StoreStore + // producer + spItem1(head, item); // StoreStore - // is already cancelled/fulfilled - if (thread == null || - !casThread(head, thread, null)) { // FULL barrier + // is already cancelled/fulfilled + if (thread == null || + !casThread(head, thread, null)) { // FULL barrier - // move head forward to look for next "ready" node - if (!advanceHead(head, next)) { // FULL barrier -// head = next; -// next = lpNext(head); - busySpin(); - } + // head was already used by a different thread + advanceHead(head, next); // FULL barrier -// thread = lpThread(head); - - continue; - } - -// break; -// } + continue; + } LockSupport.unpark((Thread) thread); @@ -212,6 +191,18 @@ public final class SimpleQueue extends LinkedArrayList { } } + private static final void busySpin2() { + // busy spin for the amount of time (roughly) of a CPU context switch + int spins = maxUntimedSpins; + for (;;) { + if (spins > 0) { + --spins; + } else { + break; + } + } + } + private static final void busySpin() { // busy spin for the amount of time (roughly) of a CPU context switch int spins = SPINS; @@ -243,13 +234,13 @@ public final class SimpleQueue extends LinkedArrayList { // busy spin for the amount of time (roughly) of a CPU context switch // then park (if necessary) for (;;) { - if (lvThread(myNode) == null) { + if (lpThread(myNode) == null) { return; - } else if (spins > 0) { - --spins; +// } else if (spins > 0) { +// --spins; } else if (spins > negMaxUntimedSpins) { --spins; - LockSupport.parkNanos(1); +// LockSupport.parkNanos(1); } else { // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. LockSupport.park();