diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java index ee56971..603cf51 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java @@ -1,46 +1,48 @@ package dorkbox.util.messagebus.common.simpleq; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; + +import java.util.concurrent.atomic.AtomicReference; + import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; +import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField { - private final static long THREAD; - private static final long TYPE_OFFSET; +// private final static long THREAD; +// private static final long TYPE_OFFSET; private static final long ITEM1_OFFSET; static { try { - TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("nodeType")); +// TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); - THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); +// THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } } - private static final void setMessage1(Object node, Object item) { + private static final void spItem1(Object node, Object item) { UNSAFE.putObject(node, ITEM1_OFFSET, item); } - private static final Object getMessage1(Object node) { + private static final Object lpItem1(Object node) { return UNSAFE.getObject(node, ITEM1_OFFSET); } - public final Thread get() { - return (Thread) UNSAFE.getObject(this, THREAD); - } - - - public final void set(Thread newValue) { - UNSAFE.putOrderedObject(this, THREAD, newValue); - } - - protected final boolean compareAndSet(Object expect, Object newValue) { - return UNSAFE.compareAndSwapObject(this, THREAD, expect, newValue); - } - - +// public final Thread get() { +// return (Thread) UNSAFE.getObject(this, THREAD); +// } +// +// +// public final void set(Thread newValue) { +// UNSAFE.putOrderedObject(this, THREAD, newValue); +// } +// +// protected final boolean compareAndSet(Object expect, Object newValue) { +// return UNSAFE.compareAndSwapObject(this, THREAD, expect, newValue); +// } /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -52,7 +54,29 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. * The value here is approximately the average of those across a range of tested systems. */ - private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 + private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000 + + /** The number of slots in the elimination array. */ + private static final int ARENA_LENGTH = Pow2.roundToPowerOfTwo((NCPU + 1) / 2); + + /** The mask value for indexing into the arena. */ + private static int ARENA_MASK = ARENA_LENGTH - 1; + + /** The number of times to step ahead, probe, and try to match. */ + private static final int LOOKAHEAD = Math.min(4, NCPU); + + /** The number of times to spin per lookahead step */ + private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD; + + /** A marker indicating that the arena slot is free. */ + private static final Object FREE = null; + + /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ + private static final Object WAITER = new Object(); + + + /** The arena where slots can be used to perform an exchange. */ + private final AtomicReference[] arena; long p40, p41, p42, p43, p44, p45, p46; @@ -73,19 +97,25 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField final long elementOffset = calcElementOffset(currentProducerIndex, mask); soElement(elementOffset, new Node()); } + + this.arena = new PaddedAtomicReference[ARENA_LENGTH]; + for (int i = 0; i < ARENA_LENGTH; i++) { + this.arena[i] = new PaddedAtomicReference(); + } } /** * PRODUCER */ public void put(Object item) { + final Thread thread = Thread.currentThread(); // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = this.mask + 1; final long[] sBuffer = this.sequenceBuffer; - long currentConsumerIndex; +// long currentConsumerIndex; long currentProducerIndex; long pSeqOffset; long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it @@ -95,7 +125,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField // Loading consumer before producer allows for producer increments after consumer index is read. // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is // nothing we can do to make this an exact method. - currentConsumerIndex = lvConsumerIndex(); // LoadLoad +// currentConsumerIndex = lvConsumerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); @@ -109,9 +139,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentProducerIndex, mask); - Object lpElement = lpElement(offset); - setMessage1(lpElement, item); -// spElement(offset, item); + Object lpElement = lpElementNoCast(offset); + spItem1(lpElement, item); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) @@ -125,21 +154,21 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex // Extra check required to ensure [Queue.offer == false iff queue is full] // return null; - busySpin(); } // another producer has moved the sequence by one, retry 2 // only producer will busySpin if contention -// busySpin(); + busySpin(); } } /** * CONSUMER - * @return null iff empty */ public Object take() { + final Thread thread = Thread.currentThread(); + // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long[] sBuffer = this.sequenceBuffer; @@ -168,8 +197,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentConsumerIndex, mask); - final Object e = lpElement(offset); - Object item = getMessage1(e); + final Object e = lpElementNoCast(offset); + Object item = lpItem1(e); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) @@ -184,7 +213,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] // return null; - // contention. we WILL have data in the Q, we just got to it too quickly + // contention. busySpin(); } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 97150d6..3308355 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -6,15 +6,15 @@ package dorkbox.util.messagebus.common.simpleq; abstract class PrePad { // volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } -abstract class ColdItems { +abstract class ColdItems extends PrePad { // private static AtomicInteger count = new AtomicInteger(); // public final int ID = count.getAndIncrement(); // public short type = MessageType.ONE; - public transient volatile Object item1 = null; + public Object item1 = null; // public Object item2 = null; // public Object item3 = null; // public Object[] item4 = null; @@ -22,35 +22,35 @@ abstract class ColdItems { abstract class Pad0 extends ColdItems { // volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem1 extends Pad0 { - public transient volatile boolean isConsumer = false; +// public transient volatile boolean isConsumer = false; } abstract class Pad1 extends HotItem1 { // volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem2 extends Pad1 { - public transient volatile Thread thread; +// public transient volatile Thread thread; } abstract class Pad2 extends HotItem2 { // volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem3 extends Pad2 { - public transient volatile Node next; +// public transient volatile Node next; } public class Node extends HotItem3 { // post-padding // volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java index 2d598af..65a4e30 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java @@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue imp * @param */ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); + protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); protected static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; @@ -186,6 +186,10 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular return lpElement(this.buffer, offset); } + protected final Object lpElementNoCast(long offset) { + return lpElementNoCast(this.buffer, offset); + } + /** * A plain load (no ordering/fences) of an element from a given offset. * @@ -198,6 +202,17 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular return (E) UNSAFE.getObject(buffer, offset); } + /** + * A plain load (no ordering/fences) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final Object lpElementNoCast(E[] buffer, long offset) { + return UNSAFE.getObject(buffer, offset); + } + /** * A plain load (no ordering/fences) of an element TYPE from a given offset. * diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java index 45542bb..994b7b1 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java @@ -18,12 +18,12 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; +import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue; import dorkbox.util.messagebus.common.simpleq.Node; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; public class MpmcQueueAltPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); @@ -33,7 +33,7 @@ public class MpmcQueueAltPerfTest { System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); + final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -48,20 +48,16 @@ public class MpmcQueueAltPerfTest { System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); } - private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { + private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception { Producer p = new Producer(queue); Thread thread = new Thread(p); thread.start(); // producer will timestamp start - MpmcArrayQueue consumer = queue; - Node result; + MpmcExchangerQueue consumer = queue; + Object result; int i = REPETITIONS; - int queueEmpty = 0; do { - while (null == (result = consumer.poll())) { - queueEmpty++; - Thread.yield(); - } + result = consumer.take(); } while (0 != --i); long end = System.nanoTime(); @@ -69,45 +65,27 @@ public class MpmcQueueAltPerfTest { long duration = end - p.start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, - qName, result.item1, queueEmpty, p.queueFull); + System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); return ops; } - @SuppressWarnings("rawtypes") public static class Producer implements Runnable { - private final MpmcArrayQueue queue; - int queueFull = 0; + private final MpmcExchangerQueue queue; long start; - public Producer(MpmcArrayQueue queue) { + public Producer(MpmcExchangerQueue queue) { this.queue = queue; } @Override public void run() { - MpmcArrayQueue producer = this.queue; + MpmcExchangerQueue producer = this.queue; int i = REPETITIONS; - int f = 0; long s = System.nanoTime(); - MpmcArrayQueue pool = new MpmcArrayQueue(2); - pool.offer(new Node()); - pool.offer(new Node()); - - Node node; do { - node = pool.poll(); - node.item1 = TEST_VALUE; - - while (!producer.offer(node)) { - Thread.yield(); - f++; - } - - pool.offer(node); + producer.put(TEST_VALUE); } while (0 != --i); - this.queueFull = f; this.start = s; } }