diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java index ccf222d..0182a51 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/LinkedArrayList.java @@ -2,14 +2,33 @@ package dorkbox.util.messagebus.common.simpleq; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; -public class LinkedArrayList { + +abstract class HotItemA1 extends PrePad { + public volatile Node head; +} + +abstract class PadA0 extends HotItemA1 { + volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class HotItemA2 extends PadA0 { + public volatile Node tail; +} + +abstract class PadA1 extends HotItemA2 { + volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + + +public class LinkedArrayList extends PadA1 { private static final long HEAD; private static final long TAIL; private static final long NEXT; private static final long IS_CONSUMER; - private static final long IS_READY; private static final long THREAD; private static final long ITEM1; @@ -21,7 +40,6 @@ public class LinkedArrayList { NEXT = UNSAFE.objectFieldOffset(Node.class.getField("next")); IS_CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); - IS_READY = UNSAFE.objectFieldOffset(Node.class.getField("isReady")); THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1")); } catch (NoSuchFieldException e) { @@ -66,17 +84,17 @@ public class LinkedArrayList { } final boolean advanceTail(Object expected, Object newTail) { - if (expected == lvTail()) { +// if (expected == lvTail()) { return UNSAFE.compareAndSwapObject(this, TAIL, expected, newTail); - } - return false; +// } +// return false; } final boolean advanceHead(Object expected, Object newHead) { - if (expected == lvHead()) { +// if (expected == lvHead()) { return UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead); - } - return false; +// } +// return false; } final Object lvThread(Object node) { @@ -103,35 +121,28 @@ public class LinkedArrayList { UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer); } - final boolean lpIsReady(Object node) { - return UNSAFE.getBoolean(node, IS_READY); - } + final static int SPARSE = 1; - final void spIsReady(Object node, boolean isReady) { - UNSAFE.putBoolean(node, IS_READY, isReady); - } - - - - public Node head; - public Node tail; - - private Node[] buffer; public LinkedArrayList(int size) { - this.buffer = new Node[size]; + size = size*SPARSE; + + Node[] buffer = new Node[size]; // pre-fill our data structures. This just makes sure to have happy memory. we use linkedList style iteration - for (int i=0; i < size; i++) { - this.buffer[i] = new Node(); - if (i > 0) { - this.buffer[i-1].next = this.buffer[i]; + int previous = -1; + int current = 0; + for (; current < size; current+=SPARSE) { + buffer[current] = new Node(); + if (current > 0) { + buffer[previous].next = buffer[current]; } + previous = current; } - this.buffer[size-1].next = this.buffer[0]; + buffer[previous].next = buffer[0]; - this.tail = this.buffer[0]; + this.tail = buffer[0]; this.head = this.tail.next; } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 0441214..f1cfb29 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,52 +1,62 @@ package dorkbox.util.messagebus.common.simpleq; -import java.util.concurrent.atomic.AtomicInteger; // mpmc sparse.shift = 2, for this to be fast. abstract class PrePad { + volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class ColdItems { - private static AtomicInteger count = new AtomicInteger(); - public final int ID = count.getAndIncrement(); +// private static AtomicInteger count = new AtomicInteger(); +// public final int ID = count.getAndIncrement(); - public boolean isReady = false; - public boolean isConsumer = false; // public short type = MessageType.ONE; - public Object item1 = null; + public volatile Object item1 = null; // public Object item2 = null; // public Object item3 = null; // public Object[] item4 = null; } abstract class Pad0 extends ColdItems { + volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem1 extends Pad0 { - public volatile Thread thread; + public volatile boolean isConsumer = false; } abstract class Pad1 extends HotItem1 { + volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem2 extends Pad1 { + public volatile Thread thread; +} + +abstract class Pad2 extends HotItem2 { + volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class HotItem3 extends Pad2 { public volatile Node next; } -public class Node extends HotItem2 { +public class Node extends HotItem3 { // post-padding + volatile long y0, y1, y2, y4, y5, y6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { } - @Override - public String toString() { - return "[" + this.ID + "]"; - } +// @Override +// public String toString() { +// return "[" + this.ID + "]"; +// } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index 41bee02..84f5c06 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -31,7 +31,7 @@ public final class SimpleQueue extends LinkedArrayList { * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. */ - static final int maxUntimedSpins = maxTimedSpins * 16; + static final int maxUntimedSpins = maxTimedSpins * 64; static final int negMaxUntimedSpins = -maxUntimedSpins; /** @@ -69,16 +69,16 @@ public final class SimpleQueue extends LinkedArrayList { // empty or same mode = push+park onto queue // complimentary mode = unpark+pop off queue - Object tail = lvTail(); // LoadLoad - Object head = lvHead(); // LoadLoad + final Object tail = lvTail(); // LoadLoad + final Object head = lvHead(); // LoadLoad Object thread; // it is possible that two threads check the queue at the exact same time, // BOTH can think that the queue is empty, resulting in a deadlock between threads // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when // in reality, it is. - boolean empty = head == lpNext(tail); - boolean sameMode = lpType(tail) == isConsumer; + final boolean empty = head == lpNext(tail); + final boolean sameMode = lpType(tail) == isConsumer; // empty or same mode = push+park onto queue if (empty || sameMode) { @@ -95,31 +95,20 @@ public final class SimpleQueue extends LinkedArrayList { } thread = lpThread(head); - if (thread == null) { - if (sameMode) { + if (thread != null) { + if (empty) { busySpin(); continue; } } else { - if (empty) { + if (sameMode) { busySpin(); continue; } } -// if (sameMode && !lpIsReady(tNext)) { -// // A "node" is only ready (and valid for a "isConsumer check") once the "isReady" has been set. -// continue; -// } else if (empty && lpIsReady(tNext)) { -// // A "node" is only empty (and valid for a "isEmpty check") if the head node "isReady" has not been set (otherwise, head is still in progress) -// continue; -// } - - if (isConsumer) { spType(tNext, isConsumer); - spIsReady(tNext, true); - spThread(tNext, Thread.currentThread()); if (!advanceTail(tail, tNext)) { // FULL barrier @@ -131,12 +120,11 @@ public final class SimpleQueue extends LinkedArrayList { park(tNext, timed, nanos); // this will only advance head if necessary - advanceHead(tail, tNext); - return lvItem1(tNext); +// advanceHead(tail, tNext); + return lpItem1(tNext); } else { spType(tNext, isConsumer); spItem1(tNext, item); - spIsReady(tNext, true); spThread(tNext, Thread.currentThread()); @@ -149,7 +137,7 @@ public final class SimpleQueue extends LinkedArrayList { park(tNext, timed, nanos); // this will only advance head if necessary - advanceHead(tail, tNext); +// advanceHead(tail, tNext); return null; } } @@ -159,14 +147,14 @@ public final class SimpleQueue extends LinkedArrayList { if (tail != lvTail() || head != lvHead()) { // LoadLoad // inconsistent read + busySpin(); continue; } thread = lpThread(head); if (isConsumer) { Object returnVal; - - while (true) { +// while (true) { returnVal = lpItem1(head); // is already cancelled/fulfilled @@ -174,50 +162,50 @@ public final class SimpleQueue extends LinkedArrayList { !casThread(head, thread, null)) { // FULL barrier // move head forward to look for next "ready" node - if (advanceHead(head, next)) { // FULL barrier - head = next; - next = lpNext(head); + if (!advanceHead(head, next)) { // FULL barrier + busySpin(); +// head = next; +// next = lpNext(head); } - thread = lpThread(head); - busySpin(); +// thread = lpThread(head); + continue; } - break; - } +// break; +// } - spIsReady(head, false); LockSupport.unpark((Thread) thread); - advanceHead(head, next); + advanceHead(head, next); // FULL barrier return returnVal; } else { - while (true) { - soItem1(head, item); // StoreStore +// while (true) { + spItem1(head, item); // StoreStore // is already cancelled/fulfilled if (thread == null || !casThread(head, thread, null)) { // FULL barrier // move head forward to look for next "ready" node - if (advanceHead(head, next)) { // FULL barrier - head = next; - next = lpNext(head); + if (!advanceHead(head, next)) { // FULL barrier +// head = next; +// next = lpNext(head); + busySpin(); } - thread = lpThread(head); - busySpin(); +// thread = lpThread(head); + continue; } - break; - } +// break; +// } - spIsReady(head, false); LockSupport.unpark((Thread) thread); - advanceHead(head, next); + advanceHead(head, next); // FULL barrier return null; } } @@ -292,6 +280,5 @@ public final class SimpleQueue extends LinkedArrayList { public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub - } } diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java index 1b6cff3..0ead259 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java @@ -45,7 +45,7 @@ public class SimpleQueueAltPerfTest { for (int i = 10; i < 20; i++) { sum += results[i]; } - System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); } private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception { @@ -65,6 +65,7 @@ public class SimpleQueueAltPerfTest { long duration = end - p.start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); + System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); return ops; }