Cleaned up tests, added Thread interruption back, code polish

This commit is contained in:
nathan 2015-05-09 00:09:48 +02:00
parent 7fa59650a8
commit 3aa5ca59f6
13 changed files with 806 additions and 1105 deletions

View File

@ -11,7 +11,7 @@ import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
@ -25,17 +25,13 @@ import dorkbox.util.messagebus.subscription.Subscription;
*/ */
public class MultiMBassador implements IMessageBus { public class MultiMBassador implements IMessageBus {
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling // this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>(); private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
// private final TransferQueue<Runnable> dispatchQueue;
private final MpmcTransferArrayQueue dispatchQueue; private final MpmcTransferArrayQueue dispatchQueue;
private final SubscriptionManager subscriptionManager; private final SubscriptionManager subscriptionManager;
// all threads that are available for asynchronous message dispatching
// private final int numberOfThreads;
private final Collection<Thread> threads; private final Collection<Thread> threads;
/** /**
@ -45,6 +41,11 @@ public class MultiMBassador implements IMessageBus {
*/ */
private final boolean forceExactMatches = false; private final boolean forceExactMatches = false;
/**
* Notifies the consumers that the messagebus is shutting down.
*/
private volatile boolean shuttingDown;
/** /**
* By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages * By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages
@ -73,9 +74,6 @@ public class MultiMBassador implements IMessageBus {
} }
numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads); numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads);
// this.numberOfThreads = numberOfThreads;
// this.dispatchQueue = new LinkedTransferQueue<Runnable>();
this.dispatchQueue = new MpmcTransferArrayQueue(numberOfThreads); this.dispatchQueue = new MpmcTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads);
@ -88,12 +86,22 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public void run() { public void run() {
MpmcTransferArrayQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; MpmcTransferArrayQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
Object message1; Object message1 = null;
while (true) { while (!MultiMBassador.this.shuttingDown) {
message1 = IN_QUEUE.take(); try {
publish(message1); while (true) {
message1 = IN_QUEUE.take();
publish(message1);
}
} catch (InterruptedException e) {
if (!MultiMBassador.this.shuttingDown) {
handlePublicationError(new PublicationError()
.setMessage("Thread interupted while processing message")
.setCause(e)
.setPublishedObject(message1));
}
}
} }
} }
}; };
@ -132,12 +140,12 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public boolean hasPendingMessages() { public boolean hasPendingMessages() {
// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
return this.dispatchQueue.hasPendingMessages(); return this.dispatchQueue.hasPendingMessages();
} }
@Override @Override
public void shutdown() { public void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) { for (Thread t : this.threads) {
t.interrupt(); t.interrupt();
} }
@ -396,22 +404,14 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public void publishAsync(final Object message) { public void publishAsync(final Object message) {
if (message != null) { if (message != null) {
// Runnable runnable = new Runnable() { try {
// @Override this.dispatchQueue.transfer(message);
// public void run() { } catch (InterruptedException e) {
// MultiMBassador.this.publish(message); handlePublicationError(new PublicationError()
// } .setMessage("Error while adding an asynchronous message")
// }; .setCause(e)
.setPublishedObject(message));
// try { }
// this.dispatchQueue.transfer(runnable);
this.dispatchQueue.transfer(message);
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// }
} }
} }

View File

@ -1,26 +1,23 @@
package dorkbox.util.messagebus.common.simpleq.jctools; package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1; import static dorkbox.util.messagebus.common.simpleq.Node.lpItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread; import static dorkbox.util.messagebus.common.simpleq.Node.lpThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType; import static dorkbox.util.messagebus.common.simpleq.Node.lpType;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1; import static dorkbox.util.messagebus.common.simpleq.Node.lvItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread; import static dorkbox.util.messagebus.common.simpleq.Node.lvThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1; import static dorkbox.util.messagebus.common.simpleq.Node.soItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; import static dorkbox.util.messagebus.common.simpleq.Node.soThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; import static dorkbox.util.messagebus.common.simpleq.Node.spItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; import static dorkbox.util.messagebus.common.simpleq.Node.spThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; import static dorkbox.util.messagebus.common.simpleq.Node.spType;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import org.jctools.queues.MpmcArrayQueue; import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.UnsafeAccess; import org.jctools.util.UnsafeAccess;
public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> implements TransferQueue<Object> { public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> {
private static final int TYPE_EMPTY = 0; private static final int TYPE_EMPTY = 0;
private static final int TYPE_CONSUMER = 1; private static final int TYPE_CONSUMER = 1;
private static final int TYPE_PRODUCER = 2; private static final int TYPE_PRODUCER = 2;
@ -29,8 +26,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
private static final int INPROGRESS_SPINS = MP ? 32 : 0; private static final int INPROGRESS_SPINS = MP ? 32 : 0;
private static final int PUSH_SPINS = MP ? 32 : 0; private static final int PRODUCER_CAS_FAIL_SPINS = MP ? 512 : 0;
private static final int POP_SPINS = MP ? 512 : 0; private static final int CONSUMER_CAS_FAIL_SPINS = MP ? 512 : 0;
/** /**
@ -49,12 +46,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
*/ */
private static final int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; private static final int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
private static final long SPIN_THRESHOLD = 1000L;
private final int consumerCount; private final int consumerCount;
public MpmcTransferArrayQueue(final int consumerCount) { public MpmcTransferArrayQueue(final int consumerCount) {
@ -75,19 +66,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
* <p> * <p>
* Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary. * Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary.
*/ */
@Override public final void transfer(final Object item) throws InterruptedException {
public final void transfer(final Object item) {
producerXfer(item, false, 0L);
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary
*/
@Override
public final Object take() {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer; final Object[] buffer = this.buffer;
@ -104,7 +83,111 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY; lastType = TYPE_EMPTY;
} else { } else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
// TYPE_EMPTY, TYPE_PRODUCER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(buffer, offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
park(node, myThread);
return;
} else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
}
}
}
else {
// TYPE_CONSUMER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) {
if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(buffer, offset);
spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
soItem1(e, item);
unpark(e);
return;
} else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
}
}
}
}
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary
*/
public final Object take() throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -143,16 +226,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
park(node, myThread, false, 0L); park(node, myThread);
Object item1 = lvItem1(node); Object item1 = lvItem1(node);
return item1; return item1;
} else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
} }
} }
}
// whoops, inconsistent state else {
busySpin(PUSH_SPINS);
} else {
// TYPE_PRODUCER // TYPE_PRODUCER
// complimentary mode = pop+unpark off queue // complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask); long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
@ -177,11 +260,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
unpark(e); unpark(e);
return lvItem1; return lvItem1;
} else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
} }
} }
// whoops, inconsistent state
busySpin(POP_SPINS);
} }
} }
} }
@ -190,9 +272,11 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
* CONSUMER * CONSUMER
* <p> * <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will * Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary * as long as necessary.
* <p>
* This method does not depend on thread-local for node information, and so is more efficient
*/ */
public final Object take(final Node node) { public final Object take(final Node node) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
final Object[] buffer = this.buffer; final Object[] buffer = this.buffer;
@ -209,7 +293,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY; lastType = TYPE_EMPTY;
} else { } else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -248,16 +332,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
// (seeing this value from a producer will lead to retry 2) // (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
park(node, myThread, false, 0L); park(node, myThread);
Object item1 = lvItem1(node); Object item1 = lvItem1(node);
return item1; return item1;
} else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
} }
} }
}
// whoops, inconsistent state else {
busySpin(PUSH_SPINS);
} else {
// TYPE_PRODUCER // TYPE_PRODUCER
// complimentary mode = pop+unpark off queue // complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask); long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
@ -282,11 +366,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
unpark(e); unpark(e);
return lvItem1; return lvItem1;
} else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
} }
} }
// whoops, inconsistent state
busySpin(POP_SPINS);
} }
} }
} }
@ -328,6 +411,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return true; return true;
} else {
busySpin(PRODUCER_CAS_FAIL_SPINS);
} }
// failed cas, retry 1 // failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward } else if (delta < 0 && // poll has not moved this value forward
@ -338,130 +423,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
} }
// another producer has moved the sequence by one, retry 2 // another producer has moved the sequence by one, retry 2
busySpin(PUSH_SPINS);
} }
} }
/**
* Will busy spin until timeout
*/
@Override
public boolean offer(Object item, long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
long lastTime = System.nanoTime();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long producerIndex;
long pSeqOffset;
long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
producerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(buffer, offset, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return true;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
long now = System.nanoTime();
long remaining = nanos -= now - lastTime;
lastTime = now;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UnsafeAccess.UNSAFE.park(false, 1L);
}
} else {
return false;
}
}
// another producer has moved the sequence by one, retry 2
busySpin(PUSH_SPINS);
}
}
@Override
public void put(Object item) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long producerIndex;
long pSeqOffset;
long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
producerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(buffer, offset, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
busySpin(PUSH_SPINS);
}
// another producer has moved the sequence by one, retry 2
busySpin(PUSH_SPINS);
}
}
// modification of super implementation, as to include a small busySpin on contention // modification of super implementation, as to include a small busySpin on contention
@Override @Override
@ -496,6 +460,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
return e; return e;
} else {
busySpin(CONSUMER_CAS_FAIL_SPINS);
} }
// failed cas, retry 1 // failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer } else if (delta < 0 && // slot has not been moved by producer
@ -506,7 +472,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
} }
// another consumer beat us and moved sequence ahead, retry 2 // another consumer beat us and moved sequence ahead, retry 2
busySpin(POP_SPINS);
} }
} }
@ -555,261 +520,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
} }
} }
@Override
public boolean tryTransfer(Object item) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
// TYPE_EMPTY, TYPE_PRODUCER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// don't add to queue, abort!!
return false;
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
} else {
// TYPE_CONSUMER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) {
if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(buffer, offset);
spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
soItem1(e, item);
unpark(e);
return true;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
}
}
}
@Override
public boolean tryTransfer(Object item, long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
return producerXfer(item, true, nanos);
}
@Override
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
long lastTime = System.nanoTime();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long cSeqOffset;
long producerIndex = -1; // start with bogus value, hope we don't need it
while (true) {
consumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) {
if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(buffer, offset);
spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
long now = System.nanoTime();
long remaining = nanos -= now - lastTime;
lastTime = now;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UnsafeAccess.UNSAFE.park(false, 1L);
}
// make sure to continue here (so we don't spin twice)
continue;
} else {
return null;
}
}
// another consumer beat us and moved sequence ahead, retry 2
busySpin(POP_SPINS);
}
}
@Override
public int remainingCapacity() {
// TODO Auto-generated method stub
return 0;
}
@Override
public int drainTo(Collection c) {
// TODO Auto-generated method stub
return 0;
}
@Override
public int drainTo(Collection c, int maxElements) {
// TODO Auto-generated method stub
return 0;
}
@Override
public Object[] toArray(Object[] a) {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean containsAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean addAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean removeAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean retainAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean hasWaitingConsumer() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex;
long producerIndex;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
return false;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
return lpType(previousElement) == TYPE_CONSUMER;
}
}
}
@Override
public int getWaitingConsumerCount() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex;
long producerIndex;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
return 0;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
if (lpType(previousElement) == TYPE_CONSUMER) {
return (int) (producerIndex - consumerIndex);
} else {
return 0;
}
}
}
}
public final boolean hasPendingMessages() { public final boolean hasPendingMessages() {
// local load of field to avoid repeated loads after volatile reads // local load of field to avoid repeated loads after volatile reads
final long mask = this.mask; final long mask = this.mask;
@ -825,7 +535,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
if (consumerIndex == producerIndex) { if (consumerIndex == producerIndex) {
return true; return true;
} else { } else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask));
if (previousElement == null) { if (previousElement == null) {
// the last producer hasn't finished setting the object yet // the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS); busySpin(INPROGRESS_SPINS);
@ -847,48 +557,24 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
} }
} }
/**
* @return true if the park was correctly unparked. false if there was an interruption or a timed-park expired
*/
@SuppressWarnings("null") @SuppressWarnings("null")
private final boolean park(final Object node, final Thread myThread, final boolean timed, long nanos) { private final void park(final Object node, final Thread myThread) throws InterruptedException {
long lastTime = System.nanoTime();
int spins = -1; // initialized after first item and cancel checks int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed ThreadLocalRandom randomYields = null; // bound if needed
for (;;) { for (;;) {
if (lvThread(node) == null) { if (lvThread(node) == null) {
return true; return;
} else if (myThread.isInterrupted() || timed && nanos <= 0) { } else if (myThread.isInterrupted()) {
return false; throw new InterruptedException();
} else if (spins < 0) { } else if (spins < 0) {
if (timed) { spins = PARK_UNTIMED_SPINS;
spins = PARK_TIMED_SPINS;
} else {
spins = PARK_UNTIMED_SPINS;
}
randomYields = ThreadLocalRandom.current(); randomYields = ThreadLocalRandom.current();
} else if (spins > 0) { } else if (spins > 0) {
if (randomYields.nextInt(1024) == 0) { if (randomYields.nextInt(1024) == 0) {
UnsafeAccess.UNSAFE.park(false, 1L); UnsafeAccess.UNSAFE.park(false, 1L);
} }
--spins; --spins;
} else if (timed) {
long now = System.nanoTime();
long remaining = nanos -= now - lastTime;
lastTime = now;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
// a park is too slow for this time, so just busy spin instead
busySpin(PARK_UNTIMED_SPINS);
} else {
UnsafeAccess.UNSAFE.park(false, nanos);
}
} else {
return false;
}
} else { } else {
// park can return for NO REASON (must check for thread values) // park can return for NO REASON (must check for thread values)
UnsafeAccess.UNSAFE.park(false, 0L); UnsafeAccess.UNSAFE.park(false, 0L);
@ -901,105 +587,4 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> impleme
soThread(node, null); soThread(node, null);
UnsafeAccess.UNSAFE.unpark(thread); UnsafeAccess.UNSAFE.unpark(thread);
} }
/**
* @return true if the producer successfully transferred an item (either through waiting for a consumer that took it, or transferring
* directly to an already waiting consumer). false if the thread was interrupted, or it was timed and has expired.
*/
private final boolean producerXfer(final Object item, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
} else {
final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
if (lastType != TYPE_CONSUMER) {
// TYPE_EMPTY, TYPE_PRODUCER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
final long newProducerIndex = producerIndex + 1;
if (casProducerIndex(producerIndex, newProducerIndex)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(buffer, offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore
return park(node, myThread, timed, nanos);
}
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
} else {
// TYPE_CONSUMER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long newConsumerIndex = consumerIndex + 1;
final long delta = seq - newConsumerIndex;
if (delta == 0) {
if (casConsumerIndex(consumerIndex, newConsumerIndex)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(buffer, offset);
spElement(buffer, offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
soItem1(e, item);
unpark(e);
return true;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
}
}
}
} }

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus.common.simpleq.jctools; package dorkbox.util.messagebus.common.simpleq;
import org.jctools.util.UnsafeAccess; import org.jctools.util.UnsafeAccess;

View File

@ -1,149 +1,26 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue { public class PerfTest_LinkedBlockingQueue {
// 15 == 32 * 1024 public static final int REPETITIONS = 50 * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); final int warmupRuns = 2;
final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17); final int runs = 3;
final long[] results = new long[20]; for (int concurrency = 1; concurrency < 5; concurrency++) {
for (int i = 0; i < 20; i++) { final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
System.gc(); long average = PerfTest_LinkedBlockingQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
results[i] = performanceRun(i, queue); System.out.format("PerfTest_LinkedBlockingQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int concurrency = 1; concurrency < 5; concurrency++) {
threads[j] = new Thread(producers[i], "Producer " + i); final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
threads[j+1] = new Thread(consumers[i], "Consumer " + i); long average = PerfTest_LinkedBlockingQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
} System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final LinkedBlockingQueue queue;
volatile long start;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
}
public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue;
Object result;
volatile long end;
public Consumer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
this.result = result;
this.end = System.nanoTime();
} }
} }
} }

View File

@ -0,0 +1,144 @@
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue_Block {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(concurrency);
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static class Producer implements Runnable {
private final LinkedBlockingQueue queue;
volatile long start;
private int repetitions;
public Producer(LinkedBlockingQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedBlockingQueue producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(LinkedBlockingQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedBlockingQueue consumer = this.queue;
Object result = null;
int i = this.repetitions;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,57 +1,54 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue_NonBlock { public class PerfTest_LinkedBlockingQueue_NonBlock {
// 15 == 32 * 1024 public static final int REPETITIONS = 50 * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 2; private static final int concurrency = 4;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17);
final long[] results = new long[20]; final int warmupRuns = 5;
for (int i = 0; i < 20; i++) { final int runs = 5;
System.gc();
results[i] = performanceRun(i, queue); long average = 0;
}
// only average last 10 results for summary final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
long sum = 0; average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
for (int i = 10; i < 20; i++) {
sum += results[i]; System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
} }
private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception { public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency]; Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2]; Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) { for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue); producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue); consumers[i] = new Consumer(queue, repetitions);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int j=0,i=0;i<concurrency;i++,j+=2) {
@ -84,25 +81,29 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
long duration = end - start; long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName(); String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops; return ops;
} }
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final LinkedBlockingQueue queue; private final LinkedBlockingQueue queue;
volatile long start; volatile long start;
private int repetitions;
public Producer(LinkedBlockingQueue queue) { public Producer(LinkedBlockingQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedBlockingQueue producer = this.queue; LinkedBlockingQueue producer = this.queue;
int i = REPETITIONS; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();
do { do {
@ -117,16 +118,18 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
private final LinkedBlockingQueue queue; private final LinkedBlockingQueue queue;
Object result; Object result;
volatile long end; volatile long end;
private int repetitions;
public Consumer(LinkedBlockingQueue queue) { public Consumer(LinkedBlockingQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedBlockingQueue consumer = this.queue; LinkedBlockingQueue consumer = this.queue;
Object result = null; Object result = null;
int i = REPETITIONS; int i = this.repetitions;
do { do {
while (null == (result = consumer.poll())) { while (null == (result = consumer.poll())) {

View File

@ -1,142 +1,28 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue { public class PerfTest_LinkedTransferQueue {
// 15 == 32 * 1024 public static final int REPETITIONS = 50 * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 1;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); System.out.println("reps:" + REPETITIONS);
final LinkedTransferQueue queue = new LinkedTransferQueue();
final long[] results = new long[20]; final int warmupRuns = 2;
for (int i = 0; i < 20; i++) { final int runs = 3;
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { for (int concurrency = 1; concurrency < 5; concurrency++) {
final LinkedTransferQueue queue = new LinkedTransferQueue();
Producer[] producers = new Producer[concurrency]; long average = PerfTest_LinkedTransferQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
Consumer[] consumers = new Consumer[concurrency]; System.out.format("PerfTest_LinkedTransferQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int concurrency = 1; concurrency < 5; concurrency++) {
threads[j] = new Thread(producers[i], "Producer " + i); final LinkedTransferQueue queue = new LinkedTransferQueue();
threads[j+1] = new Thread(consumers[i], "Consumer " + i); long average = PerfTest_LinkedTransferQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
} System.out.format("PerfTest_LinkedTransferQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue queue;
volatile long start;
public Producer(LinkedTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedTransferQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class Consumer implements Runnable {
private final LinkedTransferQueue queue;
Object result;
volatile long end;
public Consumer(LinkedTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedTransferQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
} }
} }
} }

View File

@ -0,0 +1,149 @@
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue_Block {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
final LinkedTransferQueue queue = new LinkedTransferQueue();
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue queue;
volatile long start;
private int repetitions;
public Producer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
}
public static class Consumer implements Runnable {
private final LinkedTransferQueue queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue consumer = this.queue;
Object result = null;
int i = this.repetitions;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,54 +1,54 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue_NonBlock { public class PerfTest_LinkedTransferQueue_NonBlock {
// 15 == 32 * 1024 public static final int REPETITIONS = 50 * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 4; private static final int concurrency = 4;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedTransferQueue queue = new LinkedTransferQueue();
final long[] results = new long[20]; final int warmupRuns = 5;
for (int i = 0; i < 20; i++) { final int runs = 5;
System.gc();
results[i] = performanceRun(i, queue); long average = 0;
}
// only average last 10 results for summary final LinkedTransferQueue queue = new LinkedTransferQueue();
long sum = 0; average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
for (int i = 10; i < 20; i++) {
sum += results[i]; System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
} }
private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency]; Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2]; Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) { for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue); producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue); consumers[i] = new Consumer(queue, repetitions);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int j=0,i=0;i<concurrency;i++,j+=2) {
@ -81,29 +81,35 @@ public class PerfTest_LinkedTransferQueue_NonBlock {
long duration = end - start; long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName(); String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops; return ops;
} }
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final LinkedTransferQueue queue; private final LinkedTransferQueue queue;
volatile long start; volatile long start;
private int repetitions;
public Producer(LinkedTransferQueue queue) { public Producer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedTransferQueue producer = this.queue; LinkedTransferQueue producer = this.queue;
int i = REPETITIONS; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();
do { do {
producer.put(TEST_VALUE); while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i); } while (0 != --i);
} }
} }
@ -112,20 +118,22 @@ public class PerfTest_LinkedTransferQueue_NonBlock {
private final LinkedTransferQueue queue; private final LinkedTransferQueue queue;
Object result; Object result;
volatile long end; volatile long end;
private int repetitions;
public Consumer(LinkedTransferQueue queue) { public Consumer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedTransferQueue consumer = this.queue; LinkedTransferQueue consumer = this.queue;
Object result = null; Object result = null;
int i = REPETITIONS; int i = this.repetitions;
do { do {
while((result = consumer.poll()) == null) { while (null == (result = consumer.poll())) {
Thread.yield(); Thread.yield();
} }
} while (0 != --i); } while (0 != --i);

View File

@ -17,7 +17,7 @@ package dorkbox.util.messagebus;
import org.jctools.queues.MpmcArrayQueue; import org.jctools.queues.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node; import dorkbox.util.messagebus.common.simpleq.Node;
public class PerfTest_MpmcArrayQueue_Baseline_Node { public class PerfTest_MpmcArrayQueue_Baseline_Node {
// 15 == 32 * 1024 // 15 == 32 * 1024

View File

@ -1,168 +1,28 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout; import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class PerfTest_MpmcTransferArrayQueue { public class PerfTest_MpmcTransferArrayQueue {
// static {
// System.setProperty("sparse.shift", "2");
// }
public static final int REPETITIONS = 50 * 1000 * 100; public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS);
System.out.println(VMSupport.vmDetails()); final int warmupRuns = 2;
System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); final int runs = 3;
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); for (int concurrency = 1; concurrency < 5; concurrency++) {
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
final int warmupRuns = 4; long average = PerfTest_MpmcTransferArrayQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
final int runs = 5; System.out.format("PerfTest_MpmcTransferArrayQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
long average = 0;
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
average = averageRun(warmupRuns, runs, queue);
// SimpleQueue.INPROGRESS_SPINS = 64;
// SimpleQueue.POP_SPINS = 512;
// SimpleQueue.PUSH_SPINS = 512;
// SimpleQueue.PARK_SPINS = 512;
//
// for (int i = 128; i< 10000;i++) {
// int full = 2*i;
//
// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
// SimpleQueue.PARK_SPINS = full;
//
//
// long newAverage = averageRun(warmupRuns, runs, queue);
// if (newAverage > average) {
// average = newAverage;
// System.err.println("Best value: " + i + " : " + newAverage);
// }
// }
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
} }
return sum/sumCount; for (int concurrency = 1; concurrency < 5; concurrency++) {
} final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
long average = PerfTest_MpmcTransferArrayQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final MpmcTransferArrayQueue queue;
volatile long start;
public Producer(MpmcTransferArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcTransferArrayQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final MpmcTransferArrayQueue queue;
Object result;
volatile long end;
public Consumer(MpmcTransferArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcTransferArrayQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
Node node = new Node();
do {
result = consumer.take(node);
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
} }
} }
} }

View File

@ -0,0 +1,181 @@
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.Node;
public class PerfTest_MpmcTransferArrayQueue_Block {
// static {
// System.setProperty("sparse.shift", "2");
// }
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
// SimpleQueue.INPROGRESS_SPINS = 64;
// SimpleQueue.POP_SPINS = 512;
// SimpleQueue.PUSH_SPINS = 512;
// SimpleQueue.PARK_SPINS = 512;
//
// for (int i = 128; i< 10000;i++) {
// int full = 2*i;
//
// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
// SimpleQueue.PARK_SPINS = full;
//
//
// long newAverage = averageRun(warmupRuns, runs, queue);
// if (newAverage > average) {
// average = newAverage;
// System.err.println("Best value: " + i + " : " + newAverage);
// }
// }
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static class Producer implements Runnable {
private final MpmcTransferArrayQueue queue;
volatile long start;
private int repetitions;
public Producer(MpmcTransferArrayQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcTransferArrayQueue producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
}
public static class Consumer implements Runnable {
private final MpmcTransferArrayQueue queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(MpmcTransferArrayQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
MpmcTransferArrayQueue consumer = this.queue;
Object result = null;
int i = this.repetitions;
Node node = new Node();
try {
do {
result = consumer.take(node);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,23 +1,25 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue;
public class PerfTest_MpmcTransferArrayQueue_NonBlock { public class PerfTest_MpmcTransferArrayQueue_NonBlock {
public static final int REPETITIONS = 50 * 1000 * 1000; public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17; public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 1; private static final int concurrency = 4;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 5; final int warmupRuns = 5;
final int runs = 5; final int runs = 5;
long average = 0; long average = 0;
final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency, QUEUE_CAPACITY); final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency);
average = averageRun(warmupRuns, runs, queue); average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
// SimpleQueue.INPROGRESS_SPINS = 64; // SimpleQueue.INPROGRESS_SPINS = 64;
// SimpleQueue.POP_SPINS = 512; // SimpleQueue.POP_SPINS = 512;
@ -42,12 +44,12 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
} }
private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception { public static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount; int runs = warmUpRuns + sumCount;
final long[] results = new long[runs]; final long[] results = new long[runs];
for (int i = 0; i < runs; i++) { for (int i = 0; i < runs; i++) {
System.gc(); System.gc();
results[i] = performanceRun(i, queue); results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
} }
// only average last X results for summary // only average last X results for summary
long sum = 0; long sum = 0;
@ -58,15 +60,15 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
return sum/sumCount; return sum/sumCount;
} }
private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency]; Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2]; Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) { for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue); producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue); consumers[i] = new Consumer(queue, repetitions);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int j=0,i=0;i<concurrency;i++,j+=2) {
@ -99,25 +101,29 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
long duration = end - start; long duration = end - start;
long ops = REPETITIONS * 1_000_000_000L / duration; long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName(); String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops; return ops;
} }
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final MpmcTransferArrayQueue queue; private final MpmcTransferArrayQueue queue;
volatile long start; volatile long start;
private int repetitions;
public Producer(MpmcTransferArrayQueue queue) { public Producer(MpmcTransferArrayQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
MpmcTransferArrayQueue producer = this.queue; MpmcTransferArrayQueue producer = this.queue;
int i = REPETITIONS; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();
do { do {
@ -132,16 +138,18 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock {
private final MpmcTransferArrayQueue queue; private final MpmcTransferArrayQueue queue;
Object result; Object result;
volatile long end; volatile long end;
private int repetitions;
public Consumer(MpmcTransferArrayQueue queue) { public Consumer(MpmcTransferArrayQueue queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
MpmcTransferArrayQueue consumer = this.queue; MpmcTransferArrayQueue consumer = this.queue;
Object result = null; Object result = null;
int i = REPETITIONS; int i = this.repetitions;
do { do {
while (null == (result = consumer.poll())) { while (null == (result = consumer.poll())) {