WIP - got queue to no longer deadlock

This commit is contained in:
nathan 2015-04-19 14:25:12 +02:00
parent 2e6ba71eed
commit 42230706c7
20 changed files with 594 additions and 1509 deletions

@ -1,10 +0,0 @@
dependencies {
testCompile "junit:junit:4.10@jar"

@ -1,742 +0,0 @@
@ -1,281 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
Mbassador is a fast and flexible message bus system following the publish subscribe pattern.
It is designed for ease of use and aims to be feature rich and extensible
while preserving resource efficiency and performance.
It features:
declarative handler definition via annotations,
sync and/or async message delivery,
message filtering,
ordering of message handlers etc.
<name>MIT license</name>
<name>Benjamin Diedrichsen</name>
<!-- Local repository (for testing)
<!-- plugin>
</plugin -->
<!-- exclude the suite which is a convenience class for running all tests from IDE or using scripts -->
<!-- bind the source attaching to package phase -->
These two plugins take care of building and publishing the javadoc, using
mvn clean javadoc:javadoc scm-publish:publish-scm
<header>mbassador, ${project.version}</header>
<footer>mbassador, ${project.version}</footer>
<doctitle>mbassador, ${project.version}</doctitle>
<checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
@ -68,7 +68,7 @@ public class DisruptorQueue {
try { try {
// System.err.println("+(" + seq + ") " + message1); // System.err.println("+(" + seq + ") " + message1);
MessageHolder eventJob = ringBuffer.get(seq); MessageHolder eventJob = ringBuffer.get(seq);
eventJob.messageType = MessageTypeOLD.ONE;
eventJob.message1 = message1; eventJob.message1 = message1;
// eventJob.message2 = message2; // eventJob.message2 = message2;
View File

@ -21,7 +21,7 @@ public class EventProcessor2 implements WorkHandlerEarlyRelease<MessageHolder> {
@Override @Override
public void onEvent(long sequence, MessageHolder event) throws Exception { public void onEvent(long sequence, MessageHolder event) throws Exception {
MessageTypeOLD messageType = event.messageType;
switch (messageType) { switch (messageType) {
case ONE: { case ONE: {
Object message1 = event.message1; Object message1 = event.message1;

@ -5,7 +5,7 @@ package com.lmax.disruptor;
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
*/ */
public class MessageHolder { public class MessageHolder {
public MessageTypeOLD messageType = MessageTypeOLD.ONE;
public Object message1 = null; public Object message1 = null;
View File

@ -3,6 +3,9 @@ package com.lmax.disruptor;
* @author dorkbox, llc * @author dorkbox, llc
* Date: 2/2/15 * Date: 2/2/15
*/ */
public enum MessageType { public class MessageType {
ONE, TWO, THREE, ARRAY public static final short ONE = 1;
public static final short TWO = 2;
public static final short THREE = 3;
public static final short ARRAY = 4;
View File

@ -0,0 +1,8 @@
package com.lmax.disruptor;
* @author dorkbox, llc
* Date: 2/2/15
View File

@ -13,6 +13,7 @@ import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.simpleq.HandlerFactory; import dorkbox.util.messagebus.common.simpleq.HandlerFactory;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue; import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
@ -83,9 +84,11 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(boolean forceExactMatches, int numberOfThreads) { public MultiMBassador(boolean forceExactMatches, int numberOfThreads) {
if (numberOfThreads < 1) { if (numberOfThreads < 2) {
numberOfThreads = 1; // at LEAST 1 thread numberOfThreads = 2; // at LEAST 2 threads
} }
numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads);
this.numberOfThreads = numberOfThreads; this.numberOfThreads = numberOfThreads;
// this.dispatchQueue = new DisruptorQueue(this, numberOfThreads); // this.dispatchQueue = new DisruptorQueue(this, numberOfThreads);
@ -97,7 +100,7 @@ public class MultiMBassador implements IMessageBus {
} }
}; };
this.dispatchQueue = new SimpleQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads);
this.threads = new ArrayDeque<Thread>(numberOfThreads); this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -108,30 +111,17 @@ public class MultiMBassador implements IMessageBus {
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {
@Override @Override
public void run() { public void run() {
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
Object message1;
while (true) {
message1 = IN_QUEUE.take();
publish(message1);
}
// Runnable event = null;
int spins;
Object message1; Object message1;
try {
while (true) { while (true) {
// spins = maxSpins;
// while ((event = IN_QUEUE.poll()) == null) {
// if (spins > 100) {
// --spins;
// } else if (spins > 0) {
// --spins;
// LockSupport.parkNanos(1L);
// } else {
message1 = IN_QUEUE.take(); message1 = IN_QUEUE.take();
// break;
// }
// }
// IN_QUEUE.release(event);
// event.run();
publish(message1); publish(message1);
} catch (InterruptedException e) {
} }
} }
}; };
@ -441,8 +431,15 @@ public class MultiMBassador implements IMessageBus {
// } // }
// }; // };
try {
// this.dispatchQueue.transfer(runnable); // this.dispatchQueue.transfer(runnable);
this.dispatchQueue.put(message); this.dispatchQueue.put(message);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
} }
} }
@ -456,7 +453,14 @@ public class MultiMBassador implements IMessageBus {
} }
}; };
try {
this.dispatchQueue.put(runnable); this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setPublishedObject(message1, message2));
} }
} }
@ -471,7 +475,14 @@ public class MultiMBassador implements IMessageBus {
}; };
try {
this.dispatchQueue.put(runnable); this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setPublishedObject(message1, message2, message3));
} }
View File

@ -1,32 +1,33 @@
package dorkbox.util.messagebus.common.simpleq; package dorkbox.util.messagebus.common.simpleq;
import com.lmax.disruptor.MessageType;
// mpmc sparse.shift = 2, for this to be fast. // mpmc sparse.shift = 2, for this to be fast.
abstract class PrePad { abstract class PrePad {
volatile long z0, z1, z2, z4, z5, z6 = 7L; // volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
abstract class ColdItems extends PrePad { abstract class ColdItems {
public short nodeType = NodeType.FREE; public short type = MessageType.ONE;
public boolean isConsumer = false;
public Object item1 = null; public Object item1 = null;
public Object item2 = null; public Object item2 = null;
public Object item3 = null; public Object item3 = null;
public Object[] item4 = null; public Object[] item4 = null;
} }
abstract class Pad1 extends ColdItems { abstract class Pad0 extends ColdItems {
volatile long z0, z1, z2, z4, z5, z6 = 7L; // volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
public volatile Thread thread;
public Thread thread; public volatile Thread thread;
} }
public class Node extends HotItem1 { public class Node extends HotItem1 {
// post-padding // post-padding
volatile long z0, z1, z2, z4, z5, z6 = 7L; // volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() { public Node() {
} }

@ -0,0 +1,8 @@
package dorkbox.util.messagebus.common.simpleq;
public class NodeState {
public static final short FREE = 0;
public static final short CANCELLED = 1;
View File

@ -1,13 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
public class NodeType {
public static final short FREE = 0;
public static final short P_INIT = 1;
public static final short P_DONE = 2;
public static final short C_INIT = 1;
public static final short C_DONE = 2;
View File

@ -5,30 +5,95 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> { public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
private final static long THREAD;
private static final long ITEM1_OFFSET;
private static final long TYPE;
private static final long CONSUMER;
// private final static long NODE_OFFSET;
private final static long MESSAGE1_OFFSET;
static { static {
try { try {
// NODE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("n")); CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
MESSAGE1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
} catch (NoSuchFieldException e) { } catch (NoSuchFieldException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// Prevent rare disastrous classloading in first call to LockSupport.park.
// See: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
} }
private static final void spIsConsumer(Object node, boolean value) {
UNSAFE.putBoolean(node, CONSUMER, value);
private static final boolean lpIsConsumer(Object node) {
return UNSAFE.getBoolean(node, CONSUMER);
private static final boolean lvIsConsumer(Object node) {
return UNSAFE.getBooleanVolatile(node, CONSUMER);
private static final void spType(Object node, short type) {
UNSAFE.putShort(node, TYPE, type);
private static final short lpType(Object node) {
return UNSAFE.getShort(node, TYPE);
private static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item);
private static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1_OFFSET, item);
private static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET);
private static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1_OFFSET);
private static final Thread lvThread(Object node) {
return (Thread) UNSAFE.getObjectVolatile(node, THREAD);
private static final Thread lpThread(Object node) {
return (Thread) UNSAFE.getObject(node, THREAD);
private static final Object cancelledMarker = new Object();
private static final boolean lpIsCanceled(Object node) {
return cancelledMarker == UNSAFE.getObject(node, THREAD);
private static final void spIsCancelled(Object node) {
UNSAFE.putObject(node, THREAD, cancelledMarker);
private static final void soThread(Object node, Thread newValue) {
UNSAFE.putOrderedObject(node, THREAD, newValue);
private static final void spThread(Object node, Thread newValue) {
UNSAFE.putObject(node, THREAD, newValue);
private static final boolean casThread(Object node, Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue);
/** The number of CPUs */ /** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -41,28 +106,36 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
*/ */
private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000
private static final int SIZE = 1<<14; /**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
static final int maxTimedSpins = NCPU < 2 ? 0 : 32;
static final int negMaxTimedSpins = -maxTimedSpins;
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
static final int maxUntimedSpins = maxTimedSpins * 16;
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
static final long spinForTimeoutThreshold = 1000L;
long p40, p41, p42, p43, p44, p45, p46; long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37; long p30, p31, p32, p33, p34, p35, p36, p37;
public SimpleQueue(final int size) {
super(size);
// if (currentConsumerIndex == currentProducerIndex) {
// // automatically park, since we are the first one on the Q
// }
// // other consumers may have grabbed the element, or queue might be empty
// Node fbject = lpElement(calcElementOffset(currentConsumerIndex));
private final int numberConsumerThreads;
public SimpleQueue(int numberConsumerThreads, int size) {
super(size); super(size);
this.numberConsumerThreads = numberConsumerThreads;
// pre-fill our data structures // pre-fill our data structures
@ -77,105 +150,41 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
} }
} }
/** /**
*/ */
public void put(Object item) { public void put(Object item) throws InterruptedException {
xfer(item, false, 0);
public Object take() throws InterruptedException {
return xfer(null, false, 0);
private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException {
boolean isConsumer = item == null;
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final long capacity = this.mask + 1; final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer; final long[] sBuffer = this.sequenceBuffer;
// long currentConsumerIndex;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
Node lpElement = lpElement(offset);
setMessage1(lpElement, item);
// lpspElement(offset, fakeVal, NODE_OFFSET, MESSAGE1_OFFSET);
// spElement(offset, item);
// lpElement(offset);
// ((Node)e).setMessage1(null);
// node.setMessage1(((Node)item).getMessage1());
// Object lpElement = lpElement(offset);
// setMessage1(lpElement, 445);
// this.fakeNode.setMessage1(Integer.valueOf(12));
// e.setMessage1(((Node)item).item);
////// NodeState2 message1 = e.getMessage1();
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return null;
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
// busySpin();
private static final void setMessage1(Object node, Object item) {
// final Object o = UNSAFE.getObject(node, NODE_OFFSET);
UNSAFE.putObject(node, MESSAGE1_OFFSET, item);
private static final Object getMessage1(Object node) {
// final Object o = UNSAFE.getObject(node, NODE_OFFSET);
return UNSAFE.getObject(node, MESSAGE1_OFFSET);
* @return null iff empty
public Object take() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex; long currentConsumerIndex;
// long currentProducerIndex; long currentProducerIndex;
long cSeqOffset; long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it long pSeqOffset;
long pSeq;
long pDelta = -1;
boolean sameMode = false;
boolean empty = false;
while (true) { while (true) {
// Order matters! // Order matters!
@ -183,246 +192,271 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method. // nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentProducerIndex = lvProducerIndex(); // LoadLoad currentProducerIndex = lvProducerIndex(); // LoadLoad
// empty or same mode
// check what was last placed on the queue
if (currentProducerIndex == currentConsumerIndex) {
empty = true;
} else {
final long previousProducerIndex = currentProducerIndex - 1;
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1); final long ppDelta = ppSeq - previousProducerIndex;
if (delta == 0) { if (ppDelta == 1) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { // same mode check
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset // on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask); final long offset = calcElementOffset(previousProducerIndex, mask);
final Object e = lpElement(offset); Object element = lpElement(offset);
Object item = getMessage1(e); sameMode = lpIsConsumer(element) == isConsumer;
// final Node node = (Node)e; } else if (ppDelta < 1 && // slot has not been moved by producer
// currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
// Object item = node.getMessage1(); currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// soElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer // is empty
// (seeing this value from a consumer will lead to retry 2) empty = true;
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore } else {
// hasn't been moved yet. retry 2
return item;
if (empty || sameMode) {
// push+park onto queue
// we add ourselves to the queue and wait
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
pDelta = pSeq - currentProducerIndex;
if (pDelta == 0) {
// this is expected if we see this first time around
final long nextProducerIndex = currentProducerIndex + 1;
if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
// Successful CAS: full barrier
// it is possible that two threads check the queue at the exact same time,
// BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is.
currentConsumerIndex = lvConsumerIndex();
if (empty && currentProducerIndex != currentConsumerIndex) {
// RESET the push of this element.
empty = false;
casProducerIndex(nextProducerIndex, currentProducerIndex);
} else if (sameMode && currentProducerIndex == currentConsumerIndex) {
sameMode = false;
casProducerIndex(nextProducerIndex, currentProducerIndex);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Object element = lpElement(offset);
spIsConsumer(element, isConsumer);
spThread(element, Thread.currentThread());
if (isConsumer) {
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait
park(element, timed, nanos);
return lvItem1(element);
} else {
spItem1(element, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// now we wait
park(element, timed, nanos);
return null;
} }
// failed cas, retry 1 // failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer } else if (pDelta < 0 && // poll has not moved this value forward
currentConsumerIndex >= pIndex && // test against cached pIndex currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()] // Extra check required to ensure [Queue.offer == false iff queue is full]
// return null; // return;
// contention. we WILL have data in the Q, we just got to it too quickly
busySpin(); busySpin();
} }
} else {
// complimentary mode
// another consumer beat us and moved sequence ahead, retry 2 // get item
// only producer busyspins cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long nextConsumerIndex = currentConsumerIndex + 1;
final long cDelta = cSeq - nextConsumerIndex;
if (cDelta == 0) {
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Object element = lpElement(offset);
final Thread thread = lpThread(element);
if (isConsumer) {
if (thread == null || // is cancelled/fulfilled already
!casThread(element, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
// success
Object item1 = lpItem1(element);
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return item1;
} else {
soItem1(element, item);
if (thread == null || // is cancelled/fulfilled already
!casThread(element, thread, null)) { // failed cas state
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
// success
// pop off queue
if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// Successful CAS: full barrier
soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
return null;
// lost CAS
} else if (cDelta < 0 && // slot has not been moved by producer
currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// contention.
} }
} }
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = maxUntimedSpins;
for (;;) {
if (spins > 0) {
} else {
public void putOLD(Object message1) throws InterruptedException { break;
// // decrement count }
// // <0: no consumers available, add to Q, park and wait
// // >=0: consumers available, get one from the parking lot
// Thread myThread = Thread.currentThread();
// for (;;) {
// final int count = this.currentCount.get();
// if (this.currentCount.compareAndSet(count, count - 1)) {
// if (count <= 0) {
// // <=0: no consumers available (PUSH_P, PARK_P)
// Node<M> producer = this.producersWaiting.put();
// if (producer == null || producer.item == null) {
// System.err.println("KAPOW");
// }
// producer.item.message1 = message1;
// if (!park(producer, myThread)) {
// throw new InterruptedException();
// }
// return;
// } else {
// // >0: consumers available (TAKE_C, UNPARK_C)
// Node<M> consumer = this.consumersWaiting.take();
// while (consumer == null) {
//// busySpin();
// consumer = this.consumersWaiting.take();
// }
// consumer.item.message1 = message1;
// unpark(consumer, myThread);
// return;
// }
// }
// // contention
// busySpin();
// }
} }
public void takeOLD(MessageHolder item) throws InterruptedException {
// // increment count
// // >=0: no producers available, park and wait
// // <0: producers available, get one from the Q
// Thread myThread = Thread.currentThread();
// for (;;) {
// final int count = this.currentCount.get();
// if (this.currentCount.compareAndSet(count, count + 1)) {
// if (count >= 0) {
// // >=0: no producers available (PUT_C, PARK_C)
// Node<M> consumer = this.consumersWaiting.put();
// if (!park(consumer, myThread)) {
// throw new InterruptedException();
// }
// if (consumer.item == null || consumer.item.message1 == null) {
// System.err.println("KAPOW");
// }
// item.message1 = consumer.item.message1;
// return;
// } else {
// // <0: producers available (TAKE_P, UNPARK_P)
// Node<M> producer = this.producersWaiting.take();
// while (producer == null) {
//// busySpin();
// producer = this.producersWaiting.take();
// }
// item.message1 = producer.item.message1;
// unpark(producer, myThread);
// if (item.message1 == null) {
// System.err.println("KAPOW");
// }
// return;
// }
// }
// // contention
// busySpin();
// }
} }
/** /**
* @param myThread * @param myThread
* @return
* @return false if we were interrupted, true if we were unparked by another thread * @return false if we were interrupted, true if we were unparked by another thread
*/ */
private boolean park(Node myNode, Thread myThread) { private static final boolean park(Object myNode, boolean timed, long nanos) throws InterruptedException {
PaddedObject<Thread> waiter = myNode.waiter; // long lastTime = timed ? System.nanoTime() : 0;
Thread thread; // int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = maxUntimedSpins;
Thread myThread = Thread.currentThread();
// if (timed) {
// long now = System.nanoTime();
// nanos -= now - lastTime;
// lastTime = now;
// if (nanos <= 0) {
//// s.tryCancel(e);
// continue;
// }
// }
for (;;) {
thread = waiter.get();
if (waiter.compareAndSet(thread, myThread)) {
if (thread == null) {
// busy spin for the amount of time (roughly) of a CPU context switch // busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS; // then park (if necessary)
int spin = spins;
for (;;) { for (;;) {
if (spins > 0) { if (lvThread(myNode) == null) {
--spins; return true;
} else if (waiter.get() != myThread) { } else if (spin > 0) {
break; --spin;
} else if (spin > negMaxTimedSpins) {
} else { } else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park(); LockSupport.park();
if (myThread.isInterrupted()) { if (myThread.isInterrupted()) {
waiter.set(null); casThread(myNode, myThread, null);
return false; return false;
} }
break; }
} }
} }
// do { // /**
// // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. // * Unparks the other node (if it was waiting)
// LockSupport.park(); // */
// if (myThread.isInterrupted()) { // private static final void unpark(Object otherNode) {
// myNode.waiter.set(null); // Thread myThread = Thread.currentThread();
// return false; // Thread thread;
// for (;;) {
// thread = getThread(otherNode);
// if (threadCAS(otherNode, thread, myThread)) {
// if (thread == null) {
// // no parking (UNPARK won the race)
// return;
// } else if (thread != myThread) {
// // park will always set the waiter back to null
// LockSupport.unpark(thread);
// return;
// } else {
// // contention
// busySpin();
// }
// }
// }
// } // }
// } while (myNode.waiter.get() == myThread);
return true;
} else if (thread != myThread) {
// no parking
return true;
} else {
// contention
* Unparks the other node (if it was waiting)
private void unpark(Node otherNode, Thread myThread) {
PaddedObject<Thread> waiter = otherNode.waiter;
Thread thread;
for (;;) {
thread = waiter.get();
if (waiter.compareAndSet(thread, myThread)) {
if (thread == null) {
// no parking
} else if (thread != myThread) {
// park will always set the waiter back to null
} else {
// contention
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
} else {
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
// return this.consumersWaiting.size() == this.numberConsumerThreads;
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() != lvProducerIndex();
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
@Override @Override
public boolean offer(Node message) { public boolean offer(Node message) {
@ -443,4 +477,24 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
public int size() { public int size() {
return 0; return 0;
} }
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
// return this.consumersWaiting.size() == this.numberConsumerThreads;
return false;
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
View File

@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> imp
* @param <E> * @param <E>
*/ */
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> { public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD; protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE; private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT; private static final int REF_ELEMENT_SHIFT;

View File

@ -101,11 +101,11 @@ public class MpmcArrayQueue<T> extends MpmcArrayQueueConsumerField<T> {
currentProducerIndex - capacity <= cIndex && // test against cached cIndex currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full] // Extra check required to ensure [Queue.offer == false iff queue is full]
// return false; return false;
} }
// another producer has moved the sequence by one, retry 2 // another producer has moved the sequence by one, retry 2
} }
} }
@ -151,11 +151,11 @@ public class MpmcArrayQueue<T> extends MpmcArrayQueueConsumerField<T> {
currentConsumerIndex >= pIndex && // test against cached pIndex currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()] // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null; return null;
} }
// another consumer beat us and moved sequence ahead, retry 2 // another consumer beat us and moved sequence ahead, retry 2
// only producer will busy spin
} }
} }

@ -18,12 +18,13 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport; import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue;
import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueAltPerfTest { public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024 // 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
@ -31,10 +32,8 @@ public class MpmcQueueAltPerfTest {
System.out.println(VMSupport.vmDetails()); System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY);
final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY);
final long[] results = new long[20]; final long[] results = new long[20];
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
@ -49,19 +48,20 @@ public class MpmcQueueAltPerfTest {
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
} }
private static long performanceRun(int runNumber, MpmcArrayQueue<Node> queue) throws Exception {
private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue); Producer p = new Producer(queue);
Thread thread = new Thread(p); Thread thread = new Thread(p);
thread.start(); // producer will timestamp start thread.start(); // producer will timestamp start
// }
MpmcExchangerQueue consumer = queue; MpmcArrayQueue<Node> consumer = queue;
Object result; Node result;
int queueEmpty = 0;
do { do {
result = consumer.take(); while (null == (result = consumer.poll())) {
} while (0 != --i); } while (0 != --i);
long end = System.nanoTime(); long end = System.nanoTime();
@ -69,29 +69,45 @@ public class MpmcQueueAltPerfTest {
long duration = end - p.start; long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName(); String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result.item1, queueEmpty, p.queueFull);
return ops; return ops;
} }
private static final Integer val = Integer.valueOf(234); @SuppressWarnings("rawtypes")
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final MpmcExchangerQueue queue; private final MpmcArrayQueue queue;
int queueFull = 0;
long start; long start;
public Producer(MpmcExchangerQueue queue) { public Producer(MpmcArrayQueue queue) {
this.queue = queue; this.queue = queue;
} }
@Override @Override
public void run() { public void run() {
MpmcExchangerQueue producer = this.queue; MpmcArrayQueue producer = this.queue;
int f = 0;
long s = System.nanoTime(); long s = System.nanoTime();
MpmcArrayQueue<Node> pool = new MpmcArrayQueue<Node>(2);
pool.offer(new Node());
pool.offer(new Node());
Node node;
do { do {
producer.put(val); node = pool.poll();
node.item1 = TEST_VALUE;
while (!producer.offer(node)) {
} while (0 != --i); } while (0 != --i);
this.queueFull = f;
this.start = s; this.start = s;
} }
} }

@ -30,7 +30,7 @@ public class MpmcQueueBaselineNodePerfTest {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails()); System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Integer.class).toPrintable()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY); final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY);

View File

@ -6,7 +6,6 @@ package dorkbox.util.messagebus;
import junit.framework.Assert; import junit.framework.Assert;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
@ -19,7 +18,7 @@ public class PerformanceTest {
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static final int CONCURRENCY_LEVEL = 1; public static final int CONCURRENCY_LEVEL = 2;
private static long count = 0; private static long count = 0;
@ -32,72 +31,6 @@ public class PerformanceTest {
}; };
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// testSpeed();
private static void testSpeed() throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final SimpleQueue queue = new SimpleQueue (QUEUE_CAPACITY, 1 << 14);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
results[i] = performanceRun(i, queue);
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
// }
SimpleQueue consumer = queue;
Object result;
do {
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result);
return ops;
public static class Producer implements Runnable {
private final SimpleQueue queue;
long start;
public Producer(SimpleQueue queue) {
this.queue = queue;
public void run() {
SimpleQueue producer = this.queue;
long s = System.nanoTime();
do {
} while (0 != --i);
this.start = s;
private static void tesCorrectness() {
final MultiMBassador bus = new MultiMBassador(CONCURRENCY_LEVEL); final MultiMBassador bus = new MultiMBassador(CONCURRENCY_LEVEL);
bus.addErrorHandler(TestFailingHandler); bus.addErrorHandler(TestFailingHandler);

@ -0,0 +1,97 @@
* Copyright 2012 Real Logic Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
public class SimpleQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final SimpleQueue queue = new SimpleQueue(1<<17);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
results[i] = performanceRun(i, queue);
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
SimpleQueue consumer = queue;
Object result;
do {
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result);
return ops;
public static class Producer implements Runnable {
private final SimpleQueue queue;
int queueFull = 0;
long start;
public Producer(SimpleQueue queue) {
this.queue = queue;
public void run() {
SimpleQueue producer = this.queue;
long s = System.nanoTime();
try {
do {
} while (0 != --i);
} catch (InterruptedException e) {
this.start = s;