From 72cc6d15d4eb6d0dc56b7618cc607c773f1647e3 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 10 Apr 2015 13:10:10 +0200 Subject: [PATCH] WIP - 64Mop/s w/ node --- .../util/messagebus/MultiMBassador.java | 6 +- .../common/simpleq/MpmcExchangerQueue.java | 40 ++--- .../common/simpleq/MpmcExchangerQueueAlt.java | 51 +++--- .../util/messagebus/common/simpleq/Node.java | 76 ++++++--- .../common/simpleq/SimpleQueue.java | 150 ++++++++++++++++-- .../jctools/ConcurrentCircularArrayQueue.java | 12 +- .../simpleq/jctools/MpmcArrayQueue.java | 16 +- .../util/messagebus/MpmcQueueAltPerfTest.java | 43 ++--- .../messagebus/MpmcQueueBaselinePerfTest.java | 19 ++- .../util/messagebus/PerformanceTest.java | 2 +- 10 files changed, 289 insertions(+), 126 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index db6d0d7..33c8487 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -449,7 +449,7 @@ public class MultiMBassador implements IMessageBus { try { // this.dispatchQueue.transfer(runnable); - this.dispatchQueue.transfer(message); + this.dispatchQueue.put(message); } catch (InterruptedException e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") @@ -470,7 +470,7 @@ public class MultiMBassador implements IMessageBus { }; try { - this.dispatchQueue.transfer(runnable); + this.dispatchQueue.put(runnable); } catch (InterruptedException e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") @@ -492,7 +492,7 @@ public class MultiMBassador implements IMessageBus { try { - this.dispatchQueue.transfer(runnable); + this.dispatchQueue.put(runnable); } catch (InterruptedException e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java index 3925352..73caf5a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java @@ -21,7 +21,6 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField factory, final int size) { super(size); @@ -33,32 +32,23 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField(factory.newInstance())); - } else { - // something is seriously wrong. This should never happen. - throw new RuntimeException("Unable to prefill exchangerQueue"); - } + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long elementOffset = calcElementOffset(currentProducerIndex, mask); + spElement(elementOffset, new Node(factory.newInstance())); } + + pSeqOffset = calcSequenceOffset(0, mask); + soSequence(sBuffer, pSeqOffset, 0); // StoreStore } /** * PRODUCER - * @return null iff queue is full */ public Node put() { // local load of field to avoid repeated loads after volatile reads @@ -76,7 +66,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField extends MpmcArrayQueueConsumerField e = lpElement(offset); // increment sequence by 1, the value expected by consumer @@ -104,8 +93,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField extends MpmcArrayQueueConsumerField take() { // local load of field to avoid repeated loads after volatile reads @@ -135,7 +123,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField extends MpmcArrayQueueConsumerField= pIndex && // test against cached pIndex currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] +// return null; // contention. we WILL have data in the Q, we just got to it too quickly - return null; + busySpin(); } // another consumer beat us and moved sequence ahead, retry 2 @@ -170,7 +160,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField extends MpmcArrayQueueConsumerField> { +public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField { /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -20,19 +20,11 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< long p40, p41, p42, p43, p44, p45, p46; long p30, p31, p32, p33, p34, p35, p36, p37; - private final Node newInstance; - - /** Creates a {@code EliminationStack} that is initially empty. */ - public MpmcExchangerQueueAlt(final HandlerFactory factory, final int size) { + public MpmcExchangerQueueAlt(final int size) { super(size); - - this.newInstance = new Node(factory.newInstance()); - - ////////////// // pre-fill our data structures - ////////////// // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; @@ -45,29 +37,33 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< // on 64bit(no compressed oops) JVM this is the same as seqOffset final long elementOffset = calcElementOffset(currentProducerIndex, mask); - spElement(elementOffset, this.newInstance); + spElement(elementOffset, new Node()); } - - pSeqOffset = calcSequenceOffset(0, mask); - soSequence(sBuffer, pSeqOffset, 0); // StoreStore } /** * PRODUCER * @return null iff queue is full */ - public Node put() { + public void put(Object item) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = this.mask + 1; final long[] sBuffer = this.sequenceBuffer; + long currentConsumerIndex; long currentProducerIndex; long pSeqOffset; long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it while (true) { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. +// currentConsumerIndex = lvConsumerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad + pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad final long delta = seq - currentProducerIndex; @@ -79,13 +75,14 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentProducerIndex, mask); - final Node e = lpElement(offset); + final Node e = lpElement(offset); + e.setMessage1(item); // increment sequence by 1, the value expected by consumer // (seeing this value from a producer will lead to retry 2) soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore - return e; + return; } // failed cas, retry 1 } else if (delta < 0 && // poll has not moved this value forward @@ -107,17 +104,25 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< * CONSUMER * @return null iff empty */ - public Node take() { + public Object take() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long[] sBuffer = this.sequenceBuffer; long currentConsumerIndex; + long currentProducerIndex; long cSeqOffset; long pIndex = -1; // start with bogus value, hope we don't need it while (true) { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. currentConsumerIndex = lvConsumerIndex(); // LoadLoad +// currentProducerIndex = lvProducerIndex(); // LoadLoad + + cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long delta = seq - (currentConsumerIndex + 1); @@ -128,13 +133,13 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentConsumerIndex, mask); - final Node e = lpElement(offset); + final Node e = lpElement(offset); // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore - return e; + return e.getMessage1(); } // failed cas, retry 1 } else if (delta < 0 && // slot has not been moved by producer @@ -165,17 +170,17 @@ public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField< } @Override - public boolean offer(Node message) { + public boolean offer(Node message) { return false; } @Override - public Node poll() { + public Node poll() { return null; } @Override - public Node peek() { + public Node peek() { return null; } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 463e69e..482fbcc 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,33 +1,73 @@ package dorkbox.util.messagebus.common.simpleq; -import java.util.concurrent.atomic.AtomicReference; +import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; + +// also, to increase performance due to false sharing/cache misses -- edit +// the system property "sparse.shift" +// Improve likelihood of isolation on <= 64 byte cache lines -public class Node { +// java classes DO NOT mix their fields, so if we want specific padding, in a specific order, we must +// subclass them. ALl intel CPU L1/2/2 cache line size are all 64 bytes - // Improve likelihood of isolation on <= 64 byte cache lines - // see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html - public long x0, x1, x2, x3, x4, x5, x6, x7; +// see: http://psy-lob-saw.blogspot.de/2013/05/know-thy-java-object-memory-layout.html +// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html - public volatile E item; +abstract class NodeVal { + public static final short FREE = 0; + public static final short INIT = 1; + public static final short DONE = 2; +} - public long y0, y1, y2, y3, y4, y5, y6, y7; +abstract class NodeState { + /** item stored in the node */ + public volatile short state = NodeVal.FREE; +} + +abstract class NodePad2 extends NodeState { +// volatile long y0, y1, y2, y3, y4, y5, y6 = 7L; +} + +abstract class NodeItem extends NodePad2 { + /** items stored in the node */ +// public volatile MessageType messageType = MessageType.ONE; + + public volatile Object message1 = null; +// public Object message2 = null; +// public Object message3 = null; +// public Object[] messages = null; + +} + +abstract class NodePad3 extends NodeItem { +// volatile long z0, z1, z2, z3, z4, z5, z6 = 7L; +} + +abstract class NodeWaiter extends NodePad3 { /** The Thread waiting to be signaled to wake up*/ - public AtomicReference waiter = new AtomicReference(); +// public AtomicReference waiter = new AtomicReference(); +} - public long z0, z1, z2, z3, z4, z5, z6, z7; - - - public Node(E item) { - this.item = item; +public class Node extends NodeWaiter { + private final static long MESSAGE1_OFFSET; + static { + try { + MESSAGE1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("message1")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } } - // prevent JIT from optimizing away the padding - public final long sum() { - return this.x0 + this.x1 + this.x2 + this.x3 + this.x4 + this.x5 + this.x6 + this.x7 + - this.y0 + this.y1 + this.y2 + this.y3 + this.y4 + this.y5 + this.y6 + this.y7 + - this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7; + public Node() { + } + + public void setMessage1(Object item) { + UNSAFE.putObject(this, MESSAGE1_OFFSET, item); + } + + public Object getMessage1() { + return UNSAFE.getObject(this, MESSAGE1_OFFSET); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index d0c9c30..2cd3f1c 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -1,13 +1,14 @@ package dorkbox.util.messagebus.common.simpleq; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import com.lmax.disruptor.MessageHolder; -public final class SimpleQueue { +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; + +public final class SimpleQueue extends MpmcArrayQueueConsumerField> { static { // Prevent rare disastrous classloading in first call to LockSupport.park. @@ -20,7 +21,6 @@ public final class SimpleQueue { /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); - /** * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging @@ -28,24 +28,119 @@ public final class SimpleQueue { * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. * The value here is approximately the average of those across a range of tested systems. */ - private static final int SPINS = NCPU == 1 ? 0 : 600; + private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 + private static final int SIZE = 1<<14; - private final MpmcExchangerQueueAlt consumersWaiting; - private final MpmcExchangerQueueAlt producersWaiting; + long p40, p41, p42, p43, p44, p45, p46; + long p30, p31, p32, p33, p34, p35, p36, p37; - private final AtomicInteger currentCount = new AtomicInteger(0); private final int numberConsumerThreads; public SimpleQueue(int numberConsumerThreads, HandlerFactory factory) { - + super(SIZE); this.numberConsumerThreads = numberConsumerThreads; - this.consumersWaiting = new MpmcExchangerQueueAlt(factory, 1<<14); - this.producersWaiting = new MpmcExchangerQueueAlt(factory, 1<<14); + + // pre-fill our data structures + + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; + long pSeqOffset; + long currentProducerIndex; + + for (currentProducerIndex = 0; currentProducerIndex < SIZE; currentProducerIndex++) { + pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long elementOffset = calcElementOffset(currentProducerIndex, mask); + spElement(elementOffset, new Node(factory.newInstance())); + } + + pSeqOffset = calcSequenceOffset(0, mask); + soSequence(sBuffer, pSeqOffset, 0); // StoreStore } - public void transfer(Object message1) throws InterruptedException { + /** + * PRODUCER + */ + public Node put() { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final long capacity = this.mask + 1; + final long[] sBuffer = this.sequenceBuffer; + + long currentConsumerIndex; + long currentProducerIndex; + long pSeqOffset; + long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it + + while (true) { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. + currentConsumerIndex = lvConsumerIndex(); // LoadLoad + currentProducerIndex = lvProducerIndex(); // LoadLoad + + pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - currentProducerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentProducerIndex, mask); + final Node e = lpElement(offset); + + // if only item on Q, WAIT + if (currentConsumerIndex == currentProducerIndex && lvConsumerIndex() == lvProducerIndex()) { + + } else { + + } + + + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore + + return e; + } + // failed cas, retry 1 + } else if (delta < 0 && // poll has not moved this value forward + currentProducerIndex - capacity <= cIndex && // test against cached cIndex + currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] +// return null; + busySpin(); + } + + // another producer has moved the sequence by one, retry 2 + + // only producer will busySpin if contention +// busySpin(); + } + } + + + + + + + + + + + + + public void putOLD(Object message1) throws InterruptedException { // decrement count // <0: no consumers available, add to Q, park and wait // >=0: consumers available, get one from the parking lot @@ -87,7 +182,7 @@ public final class SimpleQueue { } } - public void take(MessageHolder item) throws InterruptedException { + public void takeOLD(MessageHolder item) throws InterruptedException { // increment count // >=0: no producers available, park and wait // <0: producers available, get one from the Q @@ -103,6 +198,10 @@ public final class SimpleQueue { if (!park(consumer, myThread)) { throw new InterruptedException(); } + if (consumer.item == null || consumer.item.message1 == null) { + System.err.println("KAPOW"); + } + item.message1 = consumer.item.message1; return; } else { @@ -116,6 +215,10 @@ public final class SimpleQueue { item.message1 = producer.item.message1; unpark(producer, myThread); + if (item.message1 == null) { + System.err.println("KAPOW"); + } + return; } } @@ -217,9 +320,30 @@ public final class SimpleQueue { public boolean hasPendingMessages() { // count the number of consumers waiting, it should be the same as the number of threads configured - return this.consumersWaiting.size() == this.numberConsumerThreads; +// return this.consumersWaiting.size() == this.numberConsumerThreads; + return false; } public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { } + + @Override + public boolean offer(Node message) { + return false; + } + + @Override + public Node poll() { + return null; + } + + @Override + public Node peek() { + return null; + } + + @Override + public int size() { + return 0; + } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java index 106414a..b01801e 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java @@ -19,6 +19,8 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE import java.util.AbstractQueue; import java.util.Iterator; +import dorkbox.util.messagebus.common.simpleq.Node; + abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue implements MessagePassingQueue { long p00, p01, p02, p03, p04, p05, p06, p07; long p30, p31, p32, p33, p34, p35, p36, p37; @@ -27,7 +29,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue imp /** * A concurrent access enabling class used by circular array based queues this class exposes an offset computation * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and - * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post + * the array is padded on either side to help with False sharing prevention. It is expected that subclasses handle post * padding. *

* Offset calculation is separate from access to enable the reuse of a give compute offset. @@ -41,8 +43,8 @@ abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue imp * @param */ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); - protected static final int BUFFER_PAD = 32; + protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); + protected static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; static { @@ -54,6 +56,8 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular } else { throw new IllegalStateException("Unknown pointer size"); } + + BUFFER_PAD = 128 / scale; // Including the buffer pad in the array base offset REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT); @@ -67,7 +71,7 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular int actualCapacity = Pow2.roundToPowerOfTwo(capacity); this.mask = actualCapacity - 1; // pad data on either end with some empty slots. - this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; + this.buffer = (E[]) new Node[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; } /** diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java index 6003db6..8567516 100755 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java @@ -14,6 +14,7 @@ package dorkbox.util.messagebus.common.simpleq.jctools; + /** * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that * any and all threads may call the offer/poll/peek methods and correctness is maintained.
@@ -34,11 +35,8 @@ package dorkbox.util.messagebus.common.simpleq.jctools; *

  • Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or * equal to the requested capacity. * - * - * @param - * type of the element stored in the {@link java.util.Queue} */ -public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { +public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -67,7 +65,7 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { } @Override - public boolean offer(final E e) { + public boolean offer(final Object e) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; @@ -118,7 +116,7 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { * and must test producer index when next element is not visible. */ @Override - public E poll() { + public Object poll() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long[] sBuffer = this.sequenceBuffer; @@ -139,7 +137,7 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { // on 64bit(no compressed oops) JVM this is the same as seqOffset final long offset = calcElementOffset(currentConsumerIndex, mask); - final E e = lpElement(offset); + final Object e = lpElement(offset); spElement(offset, null); // Move sequence ahead by capacity, preparing it for next offer @@ -162,9 +160,9 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { } @Override - public E peek() { + public Object peek() { long currConsumerIndex; - E e; + Object e; do { currConsumerIndex = lvConsumerIndex(); // other consumers may have grabbed the element, or queue might be empty diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java index 3ea3662..eda1020 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java @@ -15,9 +15,7 @@ */ package dorkbox.util.messagebus; -import dorkbox.util.messagebus.common.simpleq.HandlerFactory; import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueueAlt; -import dorkbox.util.messagebus.common.simpleq.Node; public class MpmcQueueAltPerfTest { // 15 == 32 * 1024 @@ -27,16 +25,22 @@ public class MpmcQueueAltPerfTest { public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static void main(final String[] args) throws Exception { +// long offset = 0; +// try { +// long nextWriteOffset = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("message1")); +// long lastReadOffset = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("waiter")); +// offset = Math.abs(nextWriteOffset - lastReadOffset); +// junit.framework.Assert.assertTrue(offset >= 64); +// +// } catch (NoSuchFieldException | SecurityException e) { +// e.printStackTrace(); +// } +// System.out.println("Checking false sharing. Offset is: " + offset); + + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - HandlerFactory factory = new HandlerFactory() { - @Override - public Integer newInstance() { - return Integer.valueOf(777); - } - }; - - final MpmcExchangerQueueAlt queue = new MpmcExchangerQueueAlt(factory, QUEUE_CAPACITY); + final MpmcExchangerQueueAlt queue = new MpmcExchangerQueueAlt(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -52,15 +56,15 @@ public class MpmcQueueAltPerfTest { } - private static long performanceRun(int runNumber, MpmcExchangerQueueAlt queue) throws Exception { + private static long performanceRun(int runNumber, MpmcExchangerQueueAlt queue) throws Exception { // for (int i=0;i consumer = queue; - Node result; + MpmcExchangerQueueAlt consumer = queue; + Object result; int i = REPETITIONS; int queueEmpty = 0; do { @@ -72,30 +76,29 @@ public class MpmcQueueAltPerfTest { long duration = end - p.start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); - Integer finalMessage = result.item; System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, - qName, finalMessage, queueEmpty, p.queueFull); + qName, result, queueEmpty, p.queueFull); return ops; } public static class Producer implements Runnable { - private final MpmcExchangerQueueAlt queue; + private final MpmcExchangerQueueAlt queue; int queueFull = 0; long start; - public Producer(MpmcExchangerQueueAlt queue) { + public Producer(MpmcExchangerQueueAlt queue) { this.queue = queue; } @Override public void run() { - MpmcExchangerQueueAlt producer = this.queue; + MpmcExchangerQueueAlt producer = this.queue; int i = REPETITIONS; int f = 0; long s = System.nanoTime(); - Node result; + Object result; do { - producer.put(); + producer.put(TEST_VALUE); } while (0 != --i); this.queueFull = f; this.start = s; diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java index cc639e2..19322aa 100755 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselinePerfTest.java @@ -15,19 +15,18 @@ */ package dorkbox.util.messagebus; -import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; public class MpmcQueueBaselinePerfTest { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; - public static final Node TEST_VALUE = new Node(Integer.valueOf(777)); + public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static void main(final String[] args) throws Exception { System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final MpmcArrayQueue> queue = new MpmcArrayQueue>(QUEUE_CAPACITY); + final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -42,13 +41,13 @@ public class MpmcQueueBaselinePerfTest { System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); } - private static long performanceRun(int runNumber, MpmcArrayQueue> queue) throws Exception { + private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { Producer p = new Producer(queue); Thread thread = new Thread(p); thread.start(); // producer will timestamp start - MpmcArrayQueue> consumer = queue; - Node result; + MpmcArrayQueue consumer = queue; + Object result; int i = REPETITIONS; int queueEmpty = 0; do { @@ -64,22 +63,22 @@ public class MpmcQueueBaselinePerfTest { long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, - qName, result.item, queueEmpty, p.queueFull); + qName, result, queueEmpty, p.queueFull); return ops; } public static class Producer implements Runnable { - private final MpmcArrayQueue> queue; + private final MpmcArrayQueue queue; int queueFull = 0; long start; - public Producer(MpmcArrayQueue> queue) { + public Producer(MpmcArrayQueue queue) { this.queue = queue; } @Override public void run() { - MpmcArrayQueue> producer = this.queue; + MpmcArrayQueue producer = this.queue; int i = REPETITIONS; int f = 0; long s = System.nanoTime(); diff --git a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java index db9474f..bc7b375 100644 --- a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java @@ -103,7 +103,7 @@ public class PerformanceTest { long s = System.nanoTime(); try { do { - producer.transfer(TEST_VALUE); + producer.put(TEST_VALUE); } while (0 != --i); } catch (InterruptedException ignored) { }