Changed visibility of queue. Made node items private

This commit is contained in:
nathan 2015-06-26 20:59:43 +02:00
parent a1922ff788
commit 894da4c980
8 changed files with 298 additions and 193 deletions

View File

@ -1,9 +1,6 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
@ -25,7 +22,8 @@ import java.util.Collection;
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MessageBus implements IMessageBus {
public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
private final MpmcMultiTransferArrayQueue dispatchQueue;
@ -43,14 +41,16 @@ public class MessageBus implements IMessageBus {
/**
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
*/
public MessageBus() {
public
MessageBus() {
this(Runtime.getRuntime().availableProcessors() / 2);
}
/**
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MessageBus(int numberOfThreads) {
public
MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypes, SubscribeMode.MultiArg, numberOfThreads);
}
@ -58,7 +58,8 @@ public class MessageBus implements IMessageBus {
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) {
public
MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) {
numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads));
this.errorHandler = new DefaultErrorHandler();
@ -115,7 +116,8 @@ public class MessageBus implements IMessageBus {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public void run() {
public
void run() {
MpmcMultiTransferArrayQueue IN_QUEUE = MessageBus.this.dispatchQueue;
MultiNode node = new MultiNode();
@ -124,49 +126,51 @@ public class MessageBus implements IMessageBus {
//noinspection InfiniteLoopStatement
while (true) {
IN_QUEUE.take(node);
switch (node.messageType) {
Integer type = (Integer) MultiNode.lpMessageType(node);
switch (type) {
case 1: {
publish(node.item1);
publish(MultiNode.lpItem1(node));
break;
}
case 2: {
publish(node.item1, node.item2);
publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node));
break;
}
case 3: {
publish(node.item1, node.item2, node.item3);
publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node));
break;
}
default: {
publish(node.item1);
publish(MultiNode.lpItem1(node));
}
}
}
} catch (InterruptedException e) {
if (!MessageBus.this.shuttingDown) {
switch (node.messageType) {
Integer type = (Integer) MultiNode.lpMessageType(node);
switch (type) {
case 1: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1));
MultiNode.lpItem1(node)));
break;
}
case 2: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1, node.item2));
MultiNode.lpItem1(node), MultiNode.lpItem2(node)));
break;
}
case 3: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1, node.item2, node.item3));
MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)));
break;
}
default: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1));
MultiNode.lpItem1(node)));
}
}
}
@ -183,7 +187,8 @@ public class MessageBus implements IMessageBus {
/**
* Always return at least 2 threads
*/
private static int getMinNumberOfThreads(final int numberOfThreads) {
private static
int getMinNumberOfThreads(final int numberOfThreads) {
if (numberOfThreads < 2) {
return 2;
}
@ -191,43 +196,50 @@ public class MessageBus implements IMessageBus {
}
@Override
public void subscribe(final Object listener) {
public
void subscribe(final Object listener) {
MessageBus.this.subscriptionManager.subscribe(listener);
}
@Override
public void unsubscribe(final Object listener) {
public
void unsubscribe(final Object listener) {
MessageBus.this.subscriptionManager.unsubscribe(listener);
}
@Override
public void publish(final Object message) {
public
void publish(final Object message) {
subscriptionPublisher.publish(message);
}
@Override
public void publish(final Object message1, final Object message2) {
public
void publish(final Object message1, final Object message2) {
subscriptionPublisher.publish(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
public
void publish(final Object message1, final Object message2, final Object message3) {
subscriptionPublisher.publish(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) {
public
void publish(final Object[] messages) {
subscriptionPublisher.publish(messages);
}
@Override
public void publishAsync(final Object message) {
public
void publishAsync(final Object message) {
if (message != null) {
try {
this.dispatchQueue.transfer(message, MessageType.ONE);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
}
}
else {
@ -236,13 +248,14 @@ public class MessageBus implements IMessageBus {
}
@Override
public void publishAsync(final Object message1, final Object message2) {
public
void publishAsync(final Object message1, final Object message2) {
if (message1 != null && message2 != null) {
try {
this.dispatchQueue.transfer(message1, message2);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message1, message2));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2));
}
}
else {
@ -251,13 +264,14 @@ public class MessageBus implements IMessageBus {
}
@Override
public void publishAsync(final Object message1, final Object message2, final Object message3) {
public
void publishAsync(final Object message1, final Object message2, final Object message3) {
if (message1 != null || message2 != null | message3 != null) {
try {
this.dispatchQueue.transfer(message1, message2, message3);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message1, message2, message3));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2, message3));
}
}
else {
@ -266,13 +280,14 @@ public class MessageBus implements IMessageBus {
}
@Override
public void publishAsync(final Object[] messages) {
public
void publishAsync(final Object[] messages) {
if (messages != null) {
try {
this.dispatchQueue.transfer(messages, MessageType.ARRAY);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(messages));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(messages));
}
}
else {
@ -281,17 +296,20 @@ public class MessageBus implements IMessageBus {
}
@Override
public final boolean hasPendingMessages() {
public final
boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public final ErrorHandlingSupport getErrorHandler() {
public final
ErrorHandlingSupport getErrorHandler() {
return errorHandler;
}
@Override
public void start() {
public
void start() {
for (Thread t : this.threads) {
t.start();
}
@ -300,7 +318,8 @@ public class MessageBus implements IMessageBus {
}
@Override
public void shutdown() {
public
void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.common.simpleq;
package dorkbox.util.messagebus;
public final class MessageType {
final class MessageType {
public static final int ONE = 1;
public static final int TWO = 2;
public static final int THREE = 3;

View File

@ -1,19 +1,20 @@
package dorkbox.util.messagebus.common.simpleq;
package dorkbox.util.messagebus;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.UnsafeAccess;
import java.util.concurrent.ThreadLocalRandom;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.*;
public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
final
class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
private static final int TYPE_EMPTY = 0;
private static final int TYPE_CONSUMER = 1;
private static final int TYPE_PRODUCER = 2;
/** Is it multi-processor? */
/**
* Is it multi-processor?
*/
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
private static final int INPROGRESS_SPINS = MP ? 32 : 0;
@ -39,14 +40,16 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
private final int consumerCount;
public MpmcMultiTransferArrayQueue(final int consumerCount) {
public
MpmcMultiTransferArrayQueue(final int consumerCount) {
super(1024); // must be power of 2
this.consumerCount = consumerCount;
}
private static final ThreadLocal<Object> nodeThreadLocal = new ThreadLocal<Object>() {
@Override
protected Object initialValue() {
protected
Object initialValue() {
return new MultiNode();
}
};
@ -60,7 +63,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
* The item can be a single object (MessageType.ONE), or an array object (MessageType.ARRAY)
* </p>
*/
public void transfer(final Object item, final int messageType) throws InterruptedException {
public
void transfer(final Object item, final int messageType) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -76,7 +80,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
}
else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
@ -84,7 +89,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
continue;
}
lastType = lpType(previousElement);
lastType = MultiNode.lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
@ -103,11 +108,11 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
MultiNode.spType(node, TYPE_PRODUCER);
MultiNode.spThread(node, myThread);
spMessageType(node, messageType);
spItem1(node, item);
MultiNode.spMessageType(node, messageType);
MultiNode.spItem1(node, item);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
@ -122,7 +127,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
park(node, myThread);
return;
} else {
}
else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
}
@ -148,13 +154,14 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
spMessageType(e, messageType);
spItem1(e, item);
MultiNode.spMessageType(e, messageType);
MultiNode.spItem1(e, item);
unpark(e); // StoreStore
return;
} else {
}
else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
}
@ -165,10 +172,11 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
/**
* PRODUCER method
* <p>
* <p/>
* Place two items in the same slot on the queue, and wait as long as necessary for a corresponding consumer to take it.
*/
public void transfer(final Object item1, final Object item2) throws InterruptedException {
public
void transfer(final Object item1, final Object item2) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -184,7 +192,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
}
else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
@ -192,7 +201,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
continue;
}
lastType = lpType(previousElement);
lastType = MultiNode.lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
@ -211,12 +220,12 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
MultiNode.spType(node, TYPE_PRODUCER);
MultiNode.spThread(node, myThread);
spMessageType(node, MessageType.TWO);
spItem1(node, item1);
spItem2(node, item2);
MultiNode.spMessageType(node, MessageType.TWO);
MultiNode.spItem1(node, item1);
MultiNode.spItem2(node, item2);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
@ -231,7 +240,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
park(node, myThread);
return;
} else {
}
else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
}
@ -257,14 +267,15 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
spMessageType(e, MessageType.TWO);
spItem1(e, item1);
spItem2(e, item2);
MultiNode.spMessageType(e, MessageType.TWO);
MultiNode.spItem1(e, item1);
MultiNode.spItem2(e, item2);
unpark(e); // StoreStore
return;
} else {
}
else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
}
@ -274,10 +285,11 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
/**
* PRODUCER method
* <p>
* <p/>
* Place three items in the same slot on the queue, and wait as long as necessary for a corresponding consumer to take it.
*/
public void transfer(final Object item1, final Object item2, final Object item3) throws InterruptedException {
public
void transfer(final Object item1, final Object item2, final Object item3) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -293,7 +305,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
}
else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
@ -301,7 +314,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
continue;
}
lastType = lpType(previousElement);
lastType = MultiNode.lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
@ -320,13 +333,13 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
MultiNode.spType(node, TYPE_PRODUCER);
MultiNode.spThread(node, myThread);
spMessageType(node, MessageType.THREE);
spItem1(node, item1);
spItem2(node, item2);
spItem3(node, item3);
MultiNode.spMessageType(node, MessageType.THREE);
MultiNode.spItem1(node, item1);
MultiNode.spItem2(node, item2);
MultiNode.spItem3(node, item3);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
@ -341,7 +354,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
park(node, myThread);
return;
} else {
}
else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
}
@ -367,15 +381,16 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
spMessageType(e, MessageType.THREE);
spItem1(e, item1);
spItem2(e, item2);
spItem3(e, item3);
MultiNode.spMessageType(e, MessageType.THREE);
MultiNode.spItem1(e, item1);
MultiNode.spItem2(e, item2);
MultiNode.spItem3(e, item3);
unpark(e); // StoreStore
return;
} else {
}
else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
}
@ -386,15 +401,16 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
/**
* CONSUMER
* <p>
* <p/>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary.
* <p>
* <p/>
* This method does not depend on thread-local for node information, and so is more efficient.
* <p>
* <p/>
* Also, the node used by this method will contain the data.
*/
public void take(final MultiNode node) throws InterruptedException {
public
void take(final MultiNode node) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -410,7 +426,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
}
else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
@ -418,7 +435,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
continue;
}
lastType = lpType(previousElement);
lastType = MultiNode.lpType(previousElement);
}
if (lastType != TYPE_PRODUCER) {
@ -437,8 +454,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
final Thread myThread = Thread.currentThread();
// final Object node = nodeThreadLocal.publish();
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
MultiNode.spType(node, TYPE_CONSUMER);
MultiNode.spThread(node, myThread);
// The unpark thread sets our contents
@ -453,7 +470,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
park(node, myThread);
return;
} else {
}
else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
}
@ -479,15 +497,16 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
node.messageType = lvMessageType(e); // LoadLoad
node.item1 = lpItem1(e);
node.item2 = lpItem2(e);
node.item3 = lpItem3(e);
MultiNode.spMessageType(node, MultiNode.lvMessageType(e)); // LoadLoad
MultiNode.spItem1(node, MultiNode.lpItem1(e));
MultiNode.spItem2(node, MultiNode.lpItem2(e));
MultiNode.spItem3(node, MultiNode.lpItem3(e));
unpark(e);
return;
} else {
}
else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
}
@ -498,7 +517,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// modification of super implementation, as to include a small busySpin on contention
@Override
public boolean offer(Object item) {
public
boolean offer(Object item) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
@ -532,13 +552,15 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return true;
} else {
}
else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
}
else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return false;
}
@ -550,7 +572,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// modification of super implementation, as to include a small busySpin on contention
@Override
public Object poll() {
public
Object poll() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -581,15 +604,17 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
return e;
} else {
}
else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
else if (delta < 0 && // slot has not been moved by producer
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// another consumer beat us and moved sequence ahead, retry 2
@ -597,7 +622,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
}
@Override
public boolean isEmpty() {
public
boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
@ -606,7 +632,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
}
@Override
public Object peek() {
public
Object peek() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -623,7 +650,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
}
@Override
public int size() {
public
int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
@ -641,7 +669,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
}
}
public boolean hasPendingMessages() {
public
boolean hasPendingMessages() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -655,7 +684,8 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
if (consumerIndex == producerIndex) {
return true;
} else {
}
else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
@ -663,49 +693,57 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
continue;
}
return lpType(previousElement) != TYPE_CONSUMER || consumerIndex + this.consumerCount != producerIndex;
return MultiNode.lpType(previousElement) != TYPE_CONSUMER || consumerIndex + this.consumerCount != producerIndex;
}
}
}
private static void busySpin(int spins) {
for (;;) {
private static
void busySpin(int spins) {
for (; ; ) {
if (spins > 0) {
--spins;
} else {
}
else {
return;
}
}
}
@SuppressWarnings("null")
private void park(final Object node, final Thread myThread) throws InterruptedException {
private
void park(final Object node, final Thread myThread) throws InterruptedException {
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
if (lvThread(node) == null) {
for (; ; ) {
if (MultiNode.lvThread(node) == null) {
return;
} else if (myThread.isInterrupted()) {
}
else if (myThread.isInterrupted()) {
throw new InterruptedException();
} else if (spins < 0) {
}
else if (spins < 0) {
spins = PARK_UNTIMED_SPINS;
randomYields = ThreadLocalRandom.current();
} else if (spins > 0) {
}
else if (spins > 0) {
if (randomYields.nextInt(1024) == 0) {
UnsafeAccess.UNSAFE.park(false, 1L);
}
--spins;
} else {
}
else {
// park can return for NO REASON (must check for thread values)
UnsafeAccess.UNSAFE.park(false, 0L);
}
}
}
private void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null); // StoreStore
private
void unpark(Object node) {
final Object thread = MultiNode.lpThread(node);
MultiNode.soThread(node, null); // StoreStore
UnsafeAccess.UNSAFE.unpark(thread);
}
}

View File

@ -1,14 +1,14 @@
package dorkbox.util.messagebus.common.simpleq;
package dorkbox.util.messagebus;
import org.jctools.util.UnsafeAccess;
abstract class ColdItems {
public int type = 0;
private int type = 0;
public int messageType = MessageType.ONE;
public Object item1 = null;
public Object item2 = null;
public Object item3 = null;
private int messageType = MessageType.ONE;
private Object item1 = null;
private Object item2 = null;
private Object item3 = null;
}
abstract class Pad0 extends ColdItems {
@ -17,7 +17,7 @@ abstract class Pad0 extends ColdItems {
}
abstract class HotItem1 extends Pad0 {
public Thread thread;
private Thread thread;
}
public class MultiNode extends HotItem1 {
@ -32,13 +32,13 @@ public class MultiNode extends HotItem1 {
static {
try {
TYPE = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("type"));
TYPE = UnsafeAccess.UNSAFE.objectFieldOffset(ColdItems.class.getDeclaredField("type"));
MESSAGETYPE = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("messageType"));
ITEM1 = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("item1"));
ITEM2 = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("item2"));
ITEM3 = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("item3"));
THREAD = UnsafeAccess.UNSAFE.objectFieldOffset(MultiNode.class.getField("thread"));
MESSAGETYPE = UnsafeAccess.UNSAFE.objectFieldOffset(ColdItems.class.getDeclaredField("messageType"));
ITEM1 = UnsafeAccess.UNSAFE.objectFieldOffset(ColdItems.class.getDeclaredField("item1"));
ITEM2 = UnsafeAccess.UNSAFE.objectFieldOffset(ColdItems.class.getDeclaredField("item2"));
ITEM3 = UnsafeAccess.UNSAFE.objectFieldOffset(ColdItems.class.getDeclaredField("item3"));
THREAD = UnsafeAccess.UNSAFE.objectFieldOffset(HotItem1.class.getDeclaredField("thread"));
// now make sure we can access UNSAFE
MultiNode node = new MultiNode();

View File

@ -0,0 +1,35 @@
package dorkbox.util.messagebus;
/**
*
*/
public
class MTAQ_Accessor {
MpmcMultiTransferArrayQueue mpmcMultiTransferArrayQueue;
public
MTAQ_Accessor(final int consumerCount) {
mpmcMultiTransferArrayQueue = new MpmcMultiTransferArrayQueue(consumerCount);
}
public
Object poll() {
return mpmcMultiTransferArrayQueue.poll();
}
public
boolean offer(final Object item) {
return mpmcMultiTransferArrayQueue.offer(item);
}
public
void take(final MultiNode node) throws InterruptedException {
mpmcMultiTransferArrayQueue.take(node);
}
public
void transfer(final Object item, final int type) throws InterruptedException {
mpmcMultiTransferArrayQueue.transfer(item, type);
}
}

View File

@ -1,25 +1,27 @@
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.MTAQ_Accessor;
public class PerfTest_MpmcTransferArrayQueue {
public
class PerfTest_MpmcTransferArrayQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
public static void main(final String[] args) throws Exception {
public static
void main(final String[] args) throws Exception {
final int repetitions = 50_000_00;
final int warmupRuns = 4;
final int runs = 3;
for (int concurrency = 1; concurrency < 5; concurrency++) {
final MpmcMultiTransferArrayQueue queue = new MpmcMultiTransferArrayQueue(concurrency);
final MTAQ_Accessor queue = new MTAQ_Accessor(concurrency);
long average = PerfTest_MpmcTransferArrayQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, repetitions);
System.out.format("PerfTest_MpmcTransferArrayQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
}
for (int concurrency = 1; concurrency < 5; concurrency++) {
final MpmcMultiTransferArrayQueue queue = new MpmcMultiTransferArrayQueue(concurrency);
final MTAQ_Accessor queue = new MTAQ_Accessor(concurrency);
long average = PerfTest_MpmcTransferArrayQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, repetitions);
System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
}

View File

@ -1,10 +1,10 @@
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.MTAQ_Accessor;
import dorkbox.util.messagebus.MultiNode;
public class PerfTest_MpmcTransferArrayQueue_Block {
public
class PerfTest_MpmcTransferArrayQueue_Block {
// static {
// System.setProperty("sparse.shift", "2");
// }
@ -14,7 +14,8 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
public static
void main(final String[] args) throws Exception {
// System.out.println(VMSupport.vmDetails());
// System.out.println(ClassLayout.parseClass(MultiNode.class).toPrintable());
@ -23,7 +24,7 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
final int warmupRuns = 4;
final int runs = 5;
final MpmcMultiTransferArrayQueue queue = new MpmcMultiTransferArrayQueue(concurrency);
final MTAQ_Accessor queue = new MTAQ_Accessor(concurrency);
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
// SimpleQueue.INPROGRESS_SPINS = 64;
@ -49,7 +50,9 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, MpmcMultiTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
public static
long averageRun(int warmUpRuns, int sumCount, MTAQ_Accessor queue, boolean showStats, int concurrency, int repetitions)
throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
@ -62,39 +65,40 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
sum += results[i];
}
return sum/sumCount;
return sum / sumCount;
}
private static long performanceRun(int runNumber, MpmcMultiTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
private static
long performanceRun(int runNumber, MTAQ_Accessor queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
Thread[] threads = new Thread[concurrency * 2];
for (int i=0;i<concurrency;i++) {
for (int i = 0; i < concurrency; i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
for (int j = 0, i = 0; i < concurrency; i++, j += 2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
threads[j + 1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].start();
threads[i+1].start();
threads[i + 1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].join();
threads[i+1].join();
threads[i + 1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
for (int i = 0; i < concurrency; i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
@ -115,25 +119,28 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
return ops;
}
public static class Producer implements Runnable {
private final MpmcMultiTransferArrayQueue queue;
public static
class Producer implements Runnable {
private final MTAQ_Accessor queue;
volatile long start;
private int repetitions;
public Producer(MpmcMultiTransferArrayQueue queue, int repetitions) {
public
Producer(MTAQ_Accessor queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcMultiTransferArrayQueue producer = this.queue;
public
void run() {
MTAQ_Accessor producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE, MessageType.ONE);
producer.transfer(TEST_VALUE, 1);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
@ -141,20 +148,24 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
}
}
public static class Consumer implements Runnable {
private final MpmcMultiTransferArrayQueue queue;
public static
class Consumer implements Runnable {
private final MTAQ_Accessor queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(MpmcMultiTransferArrayQueue queue, int repetitions) {
public
Consumer(MTAQ_Accessor queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcMultiTransferArrayQueue consumer = this.queue;
public
void run() {
MTAQ_Accessor consumer = this.queue;
int i = this.repetitions;
MultiNode node = new MultiNode();

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.MTAQ_Accessor;
public class PerfTest_MpmcTransferArrayQueue_NonBlock {
public static final int REPETITIONS = 50 * 1000 * 100;
@ -18,7 +18,7 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
long average = 0;
final MpmcMultiTransferArrayQueue queue = new MpmcMultiTransferArrayQueue(concurrency);
final MTAQ_Accessor queue = new MTAQ_Accessor(concurrency);
average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
// SimpleQueue.INPROGRESS_SPINS = 64;
@ -44,7 +44,7 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, MpmcMultiTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
public static long averageRun(int warmUpRuns, int sumCount, MTAQ_Accessor queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
@ -60,7 +60,7 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
return sum/sumCount;
}
private static long performanceRun(int runNumber, MpmcMultiTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
private static long performanceRun(int runNumber, MTAQ_Accessor queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
@ -111,18 +111,18 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
}
public static class Producer implements Runnable {
private final MpmcMultiTransferArrayQueue queue;
private final MTAQ_Accessor queue;
volatile long start;
private int repetitions;
public Producer(MpmcMultiTransferArrayQueue queue, int repetitions) {
public Producer(MTAQ_Accessor queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcMultiTransferArrayQueue producer = this.queue;
MTAQ_Accessor producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
@ -135,19 +135,19 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
}
public static class Consumer implements Runnable {
private final MpmcMultiTransferArrayQueue queue;
private final MTAQ_Accessor queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(MpmcMultiTransferArrayQueue queue, int repetitions) {
public Consumer(MTAQ_Accessor queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcMultiTransferArrayQueue consumer = this.queue;
MTAQ_Accessor consumer = this.queue;
Object result = null;
int i = this.repetitions;