WIP - 64Mop/s w/ node

This commit is contained in:
nathan 2015-04-10 13:10:10 +02:00
parent f6bf52c57f
commit 72cc6d15d4
10 changed files with 289 additions and 126 deletions

View File

@ -449,7 +449,7 @@ public class MultiMBassador implements IMessageBus {
try {
// this.dispatchQueue.transfer(runnable);
this.dispatchQueue.transfer(message);
this.dispatchQueue.put(message);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
@ -470,7 +470,7 @@ public class MultiMBassador implements IMessageBus {
};
try {
this.dispatchQueue.transfer(runnable);
this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
@ -492,7 +492,7 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(runnable);
this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")

View File

@ -21,7 +21,6 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
long p30, p31, p32, p33, p34, p35, p36, p37;
/** Creates a {@code EliminationStack} that is initially empty. */
public MpmcExchangerQueue(final HandlerFactory<M> factory, final int size) {
super(size);
@ -33,32 +32,23 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentProducerIndex;
long pSeqOffset;
long currentProducerIndex;
for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, new Node<M>(factory.newInstance()));
} else {
// something is seriously wrong. This should never happen.
throw new RuntimeException("Unable to prefill exchangerQueue");
}
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, new Node<M>(factory.newInstance()));
}
pSeqOffset = calcSequenceOffset(0, mask);
soSequence(sBuffer, pSeqOffset, 0); // StoreStore
}
/**
* PRODUCER
* @return null iff queue is full
*/
public Node<M> put() {
// local load of field to avoid repeated loads after volatile reads
@ -76,7 +66,7 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
@ -90,7 +80,6 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Node<M> e = lpElement(offset);
// increment sequence by 1, the value expected by consumer
@ -104,8 +93,8 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return null;
// return null;
busySpin();
}
// another producer has moved the sequence by one, retry 2
@ -117,7 +106,6 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
/**
* CONSUMER
* @return null iff empty
*/
public Node<M> take() {
// local load of field to avoid repeated loads after volatile reads
@ -135,7 +123,8 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
// currentProducerIndex = lvProducerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
@ -160,9 +149,10 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// contention. we WILL have data in the Q, we just got to it too quickly
return null;
busySpin();
}
// another consumer beat us and moved sequence ahead, retry 2
@ -170,7 +160,7 @@ public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Nod
}
}
private void busySpin() {
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {

View File

@ -2,7 +2,7 @@ package dorkbox.util.messagebus.common.simpleq;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<Node<M>> {
public final class MpmcExchangerQueueAlt extends MpmcArrayQueueConsumerField<Node> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -20,19 +20,11 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
private final Node<M> newInstance;
/** Creates a {@code EliminationStack} that is initially empty. */
public MpmcExchangerQueueAlt(final HandlerFactory<M> factory, final int size) {
public MpmcExchangerQueueAlt(final int size) {
super(size);
this.newInstance = new Node<M>(factory.newInstance());
//////////////
// pre-fill our data structures
//////////////
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
@ -45,29 +37,33 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, this.newInstance);
spElement(elementOffset, new Node());
}
pSeqOffset = calcSequenceOffset(0, mask);
soSequence(sBuffer, pSeqOffset, 0); // StoreStore
}
/**
* PRODUCER
* @return null iff queue is full
*/
public Node<M> put() {
public void put(Object item) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
@ -79,13 +75,14 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Node<M> e = lpElement(offset);
final Node e = lpElement(offset);
e.setMessage1(item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
return e;
return;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
@ -107,17 +104,25 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
* CONSUMER
* @return null iff empty
*/
public Node<M> take() {
public Object take() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long currentProducerIndex;
long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentProducerIndex = lvProducerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
@ -128,13 +133,13 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Node<M> e = lpElement(offset);
final Node e = lpElement(offset);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore
return e;
return e.getMessage1();
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
@ -165,17 +170,17 @@ public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<
}
@Override
public boolean offer(Node<M> message) {
public boolean offer(Node message) {
return false;
}
@Override
public Node<M> poll() {
public Node poll() {
return null;
}
@Override
public Node<M> peek() {
public Node peek() {
return null;
}

View File

@ -1,33 +1,73 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
// also, to increase performance due to false sharing/cache misses -- edit
// the system property "sparse.shift"
// Improve likelihood of isolation on <= 64 byte cache lines
public class Node<E> {
// java classes DO NOT mix their fields, so if we want specific padding, in a specific order, we must
// subclass them. ALl intel CPU L1/2/2 cache line size are all 64 bytes
// Improve likelihood of isolation on <= 64 byte cache lines
// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html
public long x0, x1, x2, x3, x4, x5, x6, x7;
// see: http://psy-lob-saw.blogspot.de/2013/05/know-thy-java-object-memory-layout.html
// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html
public volatile E item;
abstract class NodeVal {
public static final short FREE = 0;
public static final short INIT = 1;
public static final short DONE = 2;
}
public long y0, y1, y2, y3, y4, y5, y6, y7;
abstract class NodeState {
/** item stored in the node */
public volatile short state = NodeVal.FREE;
}
abstract class NodePad2 extends NodeState {
// volatile long y0, y1, y2, y3, y4, y5, y6 = 7L;
}
abstract class NodeItem extends NodePad2 {
/** items stored in the node */
// public volatile MessageType messageType = MessageType.ONE;
public volatile Object message1 = null;
// public Object message2 = null;
// public Object message3 = null;
// public Object[] messages = null;
}
abstract class NodePad3 extends NodeItem {
// volatile long z0, z1, z2, z3, z4, z5, z6 = 7L;
}
abstract class NodeWaiter extends NodePad3 {
/** The Thread waiting to be signaled to wake up*/
public AtomicReference<Thread> waiter = new AtomicReference<Thread>();
// public AtomicReference<Thread> waiter = new AtomicReference<Thread>();
}
public long z0, z1, z2, z3, z4, z5, z6, z7;
public Node(E item) {
this.item = item;
public class Node extends NodeWaiter {
private final static long MESSAGE1_OFFSET;
static {
try {
MESSAGE1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("message1"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.x0 + this.x1 + this.x2 + this.x3 + this.x4 + this.x5 + this.x6 + this.x7 +
this.y0 + this.y1 + this.y2 + this.y3 + this.y4 + this.y5 + this.y6 + this.y7 +
this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7;
public Node() {
}
public void setMessage1(Object item) {
UNSAFE.putObject(this, MESSAGE1_OFFSET, item);
}
public Object getMessage1() {
return UNSAFE.getObject(this, MESSAGE1_OFFSET);
}
}

View File

@ -1,13 +1,14 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
public final class SimpleQueue<M extends MessageHolder> {
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class SimpleQueue<M extends MessageHolder> extends MpmcArrayQueueConsumerField<Node<M>> {
static {
// Prevent rare disastrous classloading in first call to LockSupport.park.
@ -20,7 +21,6 @@ public final class SimpleQueue<M extends MessageHolder> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
@ -28,24 +28,119 @@ public final class SimpleQueue<M extends MessageHolder> {
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600;
private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000
private static final int SIZE = 1<<14;
private final MpmcExchangerQueueAlt<M> consumersWaiting;
private final MpmcExchangerQueueAlt<M> producersWaiting;
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
private final AtomicInteger currentCount = new AtomicInteger(0);
private final int numberConsumerThreads;
public SimpleQueue(int numberConsumerThreads, HandlerFactory<M> factory) {
super(SIZE);
this.numberConsumerThreads = numberConsumerThreads;
this.consumersWaiting = new MpmcExchangerQueueAlt<M>(factory, 1<<14);
this.producersWaiting = new MpmcExchangerQueueAlt<M>(factory, 1<<14);
// pre-fill our data structures
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long pSeqOffset;
long currentProducerIndex;
for (currentProducerIndex = 0; currentProducerIndex < SIZE; currentProducerIndex++) {
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, new Node<M>(factory.newInstance()));
}
pSeqOffset = calcSequenceOffset(0, mask);
soSequence(sBuffer, pSeqOffset, 0); // StoreStore
}
public void transfer(Object message1) throws InterruptedException {
/**
* PRODUCER
*/
public Node<M> put() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Node<M> e = lpElement(offset);
// if only item on Q, WAIT
if (currentConsumerIndex == currentProducerIndex && lvConsumerIndex() == lvProducerIndex()) {
} else {
}
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return null;
busySpin();
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
// busySpin();
}
}
public void putOLD(Object message1) throws InterruptedException {
// decrement count
// <0: no consumers available, add to Q, park and wait
// >=0: consumers available, get one from the parking lot
@ -87,7 +182,7 @@ public final class SimpleQueue<M extends MessageHolder> {
}
}
public void take(MessageHolder item) throws InterruptedException {
public void takeOLD(MessageHolder item) throws InterruptedException {
// increment count
// >=0: no producers available, park and wait
// <0: producers available, get one from the Q
@ -103,6 +198,10 @@ public final class SimpleQueue<M extends MessageHolder> {
if (!park(consumer, myThread)) {
throw new InterruptedException();
}
if (consumer.item == null || consumer.item.message1 == null) {
System.err.println("KAPOW");
}
item.message1 = consumer.item.message1;
return;
} else {
@ -116,6 +215,10 @@ public final class SimpleQueue<M extends MessageHolder> {
item.message1 = producer.item.message1;
unpark(producer, myThread);
if (item.message1 == null) {
System.err.println("KAPOW");
}
return;
}
}
@ -217,9 +320,30 @@ public final class SimpleQueue<M extends MessageHolder> {
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
return this.consumersWaiting.size() == this.numberConsumerThreads;
// return this.consumersWaiting.size() == this.numberConsumerThreads;
return false;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
}
@Override
public boolean offer(Node<M> message) {
return false;
}
@Override
public Node<M> poll() {
return null;
}
@Override
public Node<M> peek() {
return null;
}
@Override
public int size() {
return 0;
}
}

View File

@ -19,6 +19,8 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE
import java.util.AbstractQueue;
import java.util.Iterator;
import dorkbox.util.messagebus.common.simpleq.Node;
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
@ -27,7 +29,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> imp
/**
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
* the array is padded on either side to help with False sharing prevention. It is expected that subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
@ -41,8 +43,8 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> imp
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD = 32;
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
@ -54,6 +56,8 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
} else {
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
+ (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT);
@ -67,7 +71,7 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
this.buffer = (E[]) new Node[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}
/**

View File

@ -14,6 +14,7 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
/**
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
@ -34,11 +35,8 @@ package dorkbox.util.messagebus.common.simpleq.jctools;
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*
* @param <E>
* type of the element stored in the {@link java.util.Queue}
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
public class MpmcArrayQueue extends MpmcArrayQueueConsumerField<Object> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -67,7 +65,7 @@ public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
}
@Override
public boolean offer(final E e) {
public boolean offer(final Object e) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
@ -118,7 +116,7 @@ public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
* and must test producer index when next element is not visible.
*/
@Override
public E poll() {
public Object poll() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
@ -139,7 +137,7 @@ public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final E e = lpElement(offset);
final Object e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
@ -162,9 +160,9 @@ public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
}
@Override
public E peek() {
public Object peek() {
long currConsumerIndex;
E e;
Object e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty

View File

@ -15,9 +15,7 @@
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.simpleq.HandlerFactory;
import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueueAlt;
import dorkbox.util.messagebus.common.simpleq.Node;
public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024
@ -27,16 +25,22 @@ public class MpmcQueueAltPerfTest {
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
// long offset = 0;
// try {
// long nextWriteOffset = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("message1"));
// long lastReadOffset = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("waiter"));
// offset = Math.abs(nextWriteOffset - lastReadOffset);
// junit.framework.Assert.assertTrue(offset >= 64);
//
// } catch (NoSuchFieldException | SecurityException e) {
// e.printStackTrace();
// }
// System.out.println("Checking false sharing. Offset is: " + offset);
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
HandlerFactory<Integer> factory = new HandlerFactory<Integer>() {
@Override
public Integer newInstance() {
return Integer.valueOf(777);
}
};
final MpmcExchangerQueueAlt<Integer> queue = new MpmcExchangerQueueAlt<Integer>(factory, QUEUE_CAPACITY);
final MpmcExchangerQueueAlt queue = new MpmcExchangerQueueAlt(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -52,15 +56,15 @@ public class MpmcQueueAltPerfTest {
}
private static long performanceRun(int runNumber, MpmcExchangerQueueAlt<Integer> queue) throws Exception {
private static long performanceRun(int runNumber, MpmcExchangerQueueAlt queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
// }
MpmcExchangerQueueAlt<Integer> consumer = queue;
Node<Integer> result;
MpmcExchangerQueueAlt consumer = queue;
Object result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
@ -72,30 +76,29 @@ public class MpmcQueueAltPerfTest {
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
Integer finalMessage = result.item;
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, finalMessage, queueEmpty, p.queueFull);
qName, result, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final MpmcExchangerQueueAlt<Integer> queue;
private final MpmcExchangerQueueAlt queue;
int queueFull = 0;
long start;
public Producer(MpmcExchangerQueueAlt<Integer> queue) {
public Producer(MpmcExchangerQueueAlt queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcExchangerQueueAlt<Integer> producer = this.queue;
MpmcExchangerQueueAlt producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
Node<Integer> result;
Object result;
do {
producer.put();
producer.put(TEST_VALUE);
} while (0 != --i);
this.queueFull = f;
this.start = s;

View File

@ -15,19 +15,18 @@
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueBaselinePerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Node<Integer> TEST_VALUE = new Node<Integer>(Integer.valueOf(777));
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node<Integer>> queue = new MpmcArrayQueue<Node<Integer>>(QUEUE_CAPACITY);
final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -42,13 +41,13 @@ public class MpmcQueueBaselinePerfTest {
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayQueue<Node<Integer>> queue) throws Exception {
private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
MpmcArrayQueue<Node<Integer>> consumer = queue;
Node<Integer> result;
MpmcArrayQueue consumer = queue;
Object result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
@ -64,22 +63,22 @@ public class MpmcQueueBaselinePerfTest {
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result.item, queueEmpty, p.queueFull);
qName, result, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final MpmcArrayQueue<Node<Integer>> queue;
private final MpmcArrayQueue queue;
int queueFull = 0;
long start;
public Producer(MpmcArrayQueue<Node<Integer>> queue) {
public Producer(MpmcArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue<Node<Integer>> producer = this.queue;
MpmcArrayQueue producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();

View File

@ -103,7 +103,7 @@ public class PerformanceTest {
long s = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException ignored) {
}