diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index b9eec9b..d49f01f 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -11,7 +11,7 @@ import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; +import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -25,17 +25,13 @@ import dorkbox.util.messagebus.subscription.Subscription; */ public class MultiMBassador implements IMessageBus { - // error handling is first-class functionality // this handler will receive all errors that occur during message dispatch or message handling private final Collection errorHandlers = new ArrayDeque(); -// private final TransferQueue dispatchQueue; private final MpmcTransferArrayQueue dispatchQueue; private final SubscriptionManager subscriptionManager; - // all threads that are available for asynchronous message dispatching -// private final int numberOfThreads; private final Collection threads; /** @@ -45,6 +41,11 @@ public class MultiMBassador implements IMessageBus { */ private final boolean forceExactMatches = false; + /** + * Notifies the consumers that the messagebus is shutting down. + */ + private volatile boolean shuttingDown; + /** * By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages @@ -73,9 +74,6 @@ public class MultiMBassador implements IMessageBus { } numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads); -// this.numberOfThreads = numberOfThreads; - -// this.dispatchQueue = new LinkedTransferQueue(); this.dispatchQueue = new MpmcTransferArrayQueue(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads); @@ -88,12 +86,22 @@ public class MultiMBassador implements IMessageBus { @Override public void run() { MpmcTransferArrayQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; -// TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; - Object message1; - while (true) { - message1 = IN_QUEUE.take(); - publish(message1); + Object message1 = null; + while (!MultiMBassador.this.shuttingDown) { + try { + while (true) { + message1 = IN_QUEUE.take(); + publish(message1); + } + } catch (InterruptedException e) { + if (!MultiMBassador.this.shuttingDown) { + handlePublicationError(new PublicationError() + .setMessage("Thread interupted while processing message") + .setCause(e) + .setPublishedObject(message1)); + } + } } } }; @@ -132,12 +140,12 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { -// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads; return this.dispatchQueue.hasPendingMessages(); } @Override public void shutdown() { + this.shuttingDown = true; for (Thread t : this.threads) { t.interrupt(); } @@ -396,22 +404,14 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(final Object message) { if (message != null) { -// Runnable runnable = new Runnable() { -// @Override -// public void run() { -// MultiMBassador.this.publish(message); -// } -// }; - -// try { -// this.dispatchQueue.transfer(runnable); - this.dispatchQueue.transfer(message); -// } catch (InterruptedException e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } + try { + this.dispatchQueue.transfer(message); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcTransferArrayQueue.java similarity index 54% rename from src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java rename to src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcTransferArrayQueue.java index c9721f0..6a89300 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcTransferArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcTransferArrayQueue.java @@ -1,26 +1,23 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; +package dorkbox.util.messagebus.common.simpleq; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; -import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; +import static dorkbox.util.messagebus.common.simpleq.Node.lpItem1; +import static dorkbox.util.messagebus.common.simpleq.Node.lpThread; +import static dorkbox.util.messagebus.common.simpleq.Node.lpType; +import static dorkbox.util.messagebus.common.simpleq.Node.lvItem1; +import static dorkbox.util.messagebus.common.simpleq.Node.lvThread; +import static dorkbox.util.messagebus.common.simpleq.Node.soItem1; +import static dorkbox.util.messagebus.common.simpleq.Node.soThread; +import static dorkbox.util.messagebus.common.simpleq.Node.spItem1; +import static dorkbox.util.messagebus.common.simpleq.Node.spThread; +import static dorkbox.util.messagebus.common.simpleq.Node.spType; -import java.util.Collection; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TransferQueue; import org.jctools.queues.MpmcArrayQueue; import org.jctools.util.UnsafeAccess; -public final class MpmcTransferArrayQueue extends MpmcArrayQueue implements TransferQueue { +public final class MpmcTransferArrayQueue extends MpmcArrayQueue { private static final int TYPE_EMPTY = 0; private static final int TYPE_CONSUMER = 1; private static final int TYPE_PRODUCER = 2; @@ -29,8 +26,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; private static final int INPROGRESS_SPINS = MP ? 32 : 0; - private static final int PUSH_SPINS = MP ? 32 : 0; - private static final int POP_SPINS = MP ? 512 : 0; + private static final int PRODUCER_CAS_FAIL_SPINS = MP ? 512 : 0; + private static final int CONSUMER_CAS_FAIL_SPINS = MP ? 512 : 0; /** @@ -49,12 +46,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme */ private static final int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - private static final long SPIN_THRESHOLD = 1000L; - private final int consumerCount; public MpmcTransferArrayQueue(final int consumerCount) { @@ -75,19 +66,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme *

* Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary. */ - @Override - public final void transfer(final Object item) { - producerXfer(item, false, 0L); - } - - /** - * CONSUMER - *

- * Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will - * as long as necessary - */ - @Override - public final Object take() { + public final void transfer(final Object item) throws InterruptedException { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final Object[] buffer = this.buffer; @@ -104,7 +83,111 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); + final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin(INPROGRESS_SPINS); + continue; + } + + lastType = lpType(previousElement); + } + + if (lastType != TYPE_CONSUMER) { + // TYPE_EMPTY, TYPE_PRODUCER + // empty or same mode = push+park onto queue + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; + + if (delta == 0) { + // this is expected if we see this first time around + final long newProducerIndex = producerIndex + 1; + if (casProducerIndex(producerIndex, newProducerIndex)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_PRODUCER); + spThread(node, myThread); + spItem1(node, item); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(buffer, offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore + + park(node, myThread); + + return; + } else { + busySpin(PRODUCER_CAS_FAIL_SPINS); + } + } + } + else { + // TYPE_CONSUMER + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long newConsumerIndex = consumerIndex + 1; + final long delta = seq - newConsumerIndex; + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, newConsumerIndex)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElement(buffer, offset); + spElement(buffer, offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore + + soItem1(e, item); + unpark(e); + + return; + } else { + busySpin(CONSUMER_CAS_FAIL_SPINS); + } + } + } + } + } + + /** + * CONSUMER + *

+ * Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will + * as long as necessary + */ + public final Object take() throws InterruptedException { + // local load of field to avoid repeated loads after volatile reads + final long mask = this.mask; + final Object[] buffer = this.buffer; + final long[] sBuffer = this.sequenceBuffer; + + long consumerIndex; + long producerIndex; + int lastType; + + while (true) { + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); + + if (consumerIndex == producerIndex) { + lastType = TYPE_EMPTY; + } else { + final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -143,16 +226,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // (seeing this value from a producer will lead to retry 2) soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore - park(node, myThread, false, 0L); + park(node, myThread); Object item1 = lvItem1(node); return item1; + } else { + busySpin(PRODUCER_CAS_FAIL_SPINS); } } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - } else { + } + else { // TYPE_PRODUCER // complimentary mode = pop+unpark off queue long cSeqOffset = calcSequenceOffset(consumerIndex, mask); @@ -177,11 +260,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme unpark(e); return lvItem1; + } else { + busySpin(CONSUMER_CAS_FAIL_SPINS); } } - - // whoops, inconsistent state - busySpin(POP_SPINS); } } } @@ -190,9 +272,11 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme * CONSUMER *

* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will - * as long as necessary + * as long as necessary. + *

+ * This method does not depend on thread-local for node information, and so is more efficient */ - public final Object take(final Node node) { + public final Object take(final Node node) throws InterruptedException { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final Object[] buffer = this.buffer; @@ -209,7 +293,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); + final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -248,16 +332,16 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme // (seeing this value from a producer will lead to retry 2) soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore - park(node, myThread, false, 0L); + park(node, myThread); Object item1 = lvItem1(node); return item1; + } else { + busySpin(PRODUCER_CAS_FAIL_SPINS); } } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - } else { + } + else { // TYPE_PRODUCER // complimentary mode = pop+unpark off queue long cSeqOffset = calcSequenceOffset(consumerIndex, mask); @@ -282,11 +366,10 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme unpark(e); return lvItem1; + } else { + busySpin(CONSUMER_CAS_FAIL_SPINS); } } - - // whoops, inconsistent state - busySpin(POP_SPINS); } } } @@ -328,6 +411,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore return true; + } else { + busySpin(PRODUCER_CAS_FAIL_SPINS); } // failed cas, retry 1 } else if (delta < 0 && // poll has not moved this value forward @@ -338,130 +423,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme } // another producer has moved the sequence by one, retry 2 - busySpin(PUSH_SPINS); } } - /** - * Will busy spin until timeout - */ - @Override - public boolean offer(Object item, long timeout, TimeUnit unit) throws InterruptedException { - long nanos = unit.toNanos(timeout); - long lastTime = System.nanoTime(); - - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long capacity = mask + 1; - final Object[] buffer = this.buffer; - final long[] sBuffer = this.sequenceBuffer; - - long producerIndex; - long pSeqOffset; - long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - - while (true) { - producerIndex = lvProducerIndex(); // LoadLoad - - pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - final long newProducerIndex = producerIndex + 1; - if (casProducerIndex(producerIndex, newProducerIndex)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(buffer, offset, item); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore - - return true; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - producerIndex - capacity <= consumerIndex && // test against cached cIndex - producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - - long now = System.nanoTime(); - long remaining = nanos -= now - lastTime; - lastTime = now; - - if (remaining > 0) { - if (remaining < SPIN_THRESHOLD) { - busySpin(PARK_UNTIMED_SPINS); - } else { - UnsafeAccess.UNSAFE.park(false, 1L); - } - } else { - return false; - } - } - - // another producer has moved the sequence by one, retry 2 - busySpin(PUSH_SPINS); - } - } - - - @Override - public void put(Object item) throws InterruptedException { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long capacity = mask + 1; - final Object[] buffer = this.buffer; - final long[] sBuffer = this.sequenceBuffer; - - long producerIndex; - long pSeqOffset; - long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - - while (true) { - producerIndex = lvProducerIndex(); // LoadLoad - - pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - final long newProducerIndex = producerIndex + 1; - if (casProducerIndex(producerIndex, newProducerIndex)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(buffer, offset, item); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore - - return; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - producerIndex - capacity <= consumerIndex && // test against cached cIndex - producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - // return false; - busySpin(PUSH_SPINS); - } - - // another producer has moved the sequence by one, retry 2 - busySpin(PUSH_SPINS); - } - } - - // modification of super implementation, as to include a small busySpin on contention @Override @@ -496,6 +460,8 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore return e; + } else { + busySpin(CONSUMER_CAS_FAIL_SPINS); } // failed cas, retry 1 } else if (delta < 0 && // slot has not been moved by producer @@ -506,7 +472,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme } // another consumer beat us and moved sequence ahead, retry 2 - busySpin(POP_SPINS); } } @@ -555,261 +520,6 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme } } - @Override - public boolean tryTransfer(Object item) { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final Object[] buffer = this.buffer; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long producerIndex; - int lastType; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - if (consumerIndex == producerIndex) { - lastType = TYPE_EMPTY; - } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - lastType = lpType(previousElement); - } - - if (lastType != TYPE_CONSUMER) { - // TYPE_EMPTY, TYPE_PRODUCER - // empty or same mode = push+park onto queue - long pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // don't add to queue, abort!! - return false; - } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - } else { - // TYPE_CONSUMER - // complimentary mode = pop+unpark off queue - long cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long newConsumerIndex = consumerIndex + 1; - final long delta = seq - newConsumerIndex; - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, newConsumerIndex)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(buffer, offset); - spElement(buffer, offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore - - soItem1(e, item); - unpark(e); - - return true; - } - } - - // whoops, inconsistent state - busySpin(POP_SPINS); - } - } - } - - @Override - public boolean tryTransfer(Object item, long timeout, TimeUnit unit) throws InterruptedException { - long nanos = unit.toNanos(timeout); - return producerXfer(item, true, nanos); - } - - @Override - public Object poll(long timeout, TimeUnit unit) throws InterruptedException { - long nanos = unit.toNanos(timeout); - long lastTime = System.nanoTime(); - - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final Object[] buffer = this.buffer; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long cSeqOffset; - long producerIndex = -1; // start with bogus value, hope we don't need it - - while (true) { - consumerIndex = lvConsumerIndex(); // LoadLoad - cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long newConsumerIndex = consumerIndex + 1; - final long delta = seq - newConsumerIndex; - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, newConsumerIndex)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(buffer, offset); - spElement(buffer, offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore - - return e; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - consumerIndex >= producerIndex && // test against cached pIndex - consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - - long now = System.nanoTime(); - long remaining = nanos -= now - lastTime; - lastTime = now; - - if (remaining > 0) { - if (remaining < SPIN_THRESHOLD) { - busySpin(PARK_UNTIMED_SPINS); - } else { - UnsafeAccess.UNSAFE.park(false, 1L); - } - // make sure to continue here (so we don't spin twice) - continue; - } else { - return null; - } - } - - // another consumer beat us and moved sequence ahead, retry 2 - busySpin(POP_SPINS); - } - } - - @Override - public int remainingCapacity() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int drainTo(Collection c) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int drainTo(Collection c, int maxElements) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public Object[] toArray(Object[] a) { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean containsAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean addAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean removeAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean retainAll(Collection c) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean hasWaitingConsumer() { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final Object[] buffer = this.buffer; - - long consumerIndex; - long producerIndex; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - if (consumerIndex == producerIndex) { - return false; - } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - return lpType(previousElement) == TYPE_CONSUMER; - } - } - } - - @Override - public int getWaitingConsumerCount() { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final Object[] buffer = this.buffer; - - long consumerIndex; - long producerIndex; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - if (consumerIndex == producerIndex) { - return 0; - } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - if (lpType(previousElement) == TYPE_CONSUMER) { - return (int) (producerIndex - consumerIndex); - } else { - return 0; - } - } - } - } - - public final boolean hasPendingMessages() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; @@ -825,7 +535,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme if (consumerIndex == producerIndex) { return true; } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); + final Object previousElement = lpElement(buffer, calcElementOffset(consumerIndex, mask)); if (previousElement == null) { // the last producer hasn't finished setting the object yet busySpin(INPROGRESS_SPINS); @@ -847,48 +557,24 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme } } - - /** - * @return true if the park was correctly unparked. false if there was an interruption or a timed-park expired - */ @SuppressWarnings("null") - private final boolean park(final Object node, final Thread myThread, final boolean timed, long nanos) { - long lastTime = System.nanoTime(); + private final void park(final Object node, final Thread myThread) throws InterruptedException { int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) { if (lvThread(node) == null) { - return true; - } else if (myThread.isInterrupted() || timed && nanos <= 0) { - return false; + return; + } else if (myThread.isInterrupted()) { + throw new InterruptedException(); } else if (spins < 0) { - if (timed) { - spins = PARK_TIMED_SPINS; - } else { - spins = PARK_UNTIMED_SPINS; - } - + spins = PARK_UNTIMED_SPINS; randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { if (randomYields.nextInt(1024) == 0) { UnsafeAccess.UNSAFE.park(false, 1L); } --spins; - } else if (timed) { - long now = System.nanoTime(); - long remaining = nanos -= now - lastTime; - lastTime = now; - if (remaining > 0) { - if (remaining < SPIN_THRESHOLD) { - // a park is too slow for this time, so just busy spin instead - busySpin(PARK_UNTIMED_SPINS); - } else { - UnsafeAccess.UNSAFE.park(false, nanos); - } - } else { - return false; - } } else { // park can return for NO REASON (must check for thread values) UnsafeAccess.UNSAFE.park(false, 0L); @@ -901,105 +587,4 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueue impleme soThread(node, null); UnsafeAccess.UNSAFE.unpark(thread); } - - /** - * @return true if the producer successfully transferred an item (either through waiting for a consumer that took it, or transferring - * directly to an already waiting consumer). false if the thread was interrupted, or it was timed and has expired. - */ - private final boolean producerXfer(final Object item, final boolean timed, final long nanos) { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final Object[] buffer = this.buffer; - final long[] sBuffer = this.sequenceBuffer; - - long consumerIndex; - long producerIndex; - int lastType; - - while (true) { - consumerIndex = lvConsumerIndex(); - producerIndex = lvProducerIndex(); - - if (consumerIndex == producerIndex) { - lastType = TYPE_EMPTY; - } else { - final Object previousElement = lpElement(buffer, calcElementOffset(producerIndex-1, mask)); - if (previousElement == null) { - // the last producer hasn't finished setting the object yet - busySpin(INPROGRESS_SPINS); - continue; - } - - lastType = lpType(previousElement); - } - - if (lastType != TYPE_CONSUMER) { - // TYPE_EMPTY, TYPE_PRODUCER - // empty or same mode = push+park onto queue - long pSeqOffset = calcSequenceOffset(producerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - producerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - final long newProducerIndex = producerIndex + 1; - if (casProducerIndex(producerIndex, newProducerIndex)) { - // Successful CAS: full barrier - - final Thread myThread = Thread.currentThread(); - final Object node = nodeThreadLocal.get(); - - spType(node, TYPE_PRODUCER); - spThread(node, myThread); - spItem1(node, item); - - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(producerIndex, mask); - spElement(buffer, offset, node); - - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, newProducerIndex); // StoreStore - - return park(node, myThread, timed, nanos); - } - } - - // whoops, inconsistent state - busySpin(PUSH_SPINS); - } else { - // TYPE_CONSUMER - // complimentary mode = pop+unpark off queue - long cSeqOffset = calcSequenceOffset(consumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long newConsumerIndex = consumerIndex + 1; - final long delta = seq - newConsumerIndex; - - if (delta == 0) { - if (casConsumerIndex(consumerIndex, newConsumerIndex)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(consumerIndex, mask); - final Object e = lpElement(buffer, offset); - spElement(buffer, offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore - - soItem1(e, item); - unpark(e); - - return true; - } - } - - // whoops, inconsistent state - busySpin(POP_SPINS); - } - } - } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java similarity index 98% rename from src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java rename to src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 8c1eaf2..35298ca 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,4 +1,4 @@ -package dorkbox.util.messagebus.common.simpleq.jctools; +package dorkbox.util.messagebus.common.simpleq; import org.jctools.util.UnsafeAccess; diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java index 90f813d..d3b2335 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue.java @@ -1,149 +1,26 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package dorkbox.util.messagebus; import java.util.concurrent.LinkedBlockingQueue; public class PerfTest_LinkedBlockingQueue { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - - private static final int concurrency = 2; public static void main(final String[] args) throws Exception { - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17); + final int warmupRuns = 2; + final int runs = 3; - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); - } - - private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency*2]; - - for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; - String qName = queue.getClass().getSimpleName(); - - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - return ops; - } - - public static class Producer implements Runnable { - private final LinkedBlockingQueue queue; - volatile long start; - - public Producer(LinkedBlockingQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - LinkedBlockingQueue producer = this.queue; - int i = REPETITIONS; - this.start = System.nanoTime(); - - try { - do { - producer.put(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - // log.error(e); - } - } - } - - public static class Consumer implements Runnable { - private final LinkedBlockingQueue queue; - Object result; - volatile long end; - - public Consumer(LinkedBlockingQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - LinkedBlockingQueue consumer = this.queue; - Object result = null; - int i = REPETITIONS; - - try { - do { - result = consumer.take(); - } while (0 != --i); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - // log.error(e); - } - - this.result = result; - this.end = System.nanoTime(); + for (int concurrency = 1; concurrency < 5; concurrency++) { + final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); + long average = PerfTest_LinkedBlockingQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); + System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency); } } } diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_Block.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_Block.java new file mode 100644 index 0000000..42da1c1 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_Block.java @@ -0,0 +1,144 @@ +package dorkbox.util.messagebus; + +import java.util.concurrent.LinkedBlockingQueue; + +public class PerfTest_LinkedBlockingQueue_Block { + public static final int REPETITIONS = 50 * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + private static final int concurrency = 4; + + public static void main(final String[] args) throws Exception { + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); + + final int warmupRuns = 4; + final int runs = 5; + + final LinkedBlockingQueue queue = new LinkedBlockingQueue(concurrency); + long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); + } + + public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = repetitions * 1_000_000_000L / duration; + String qName = queue.getClass().getSimpleName(); + + if (showStats) { + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + } + return ops; + } + + public static class Producer implements Runnable { + private final LinkedBlockingQueue queue; + volatile long start; + private int repetitions; + + public Producer(LinkedBlockingQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + LinkedBlockingQueue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + + try { + do { + producer.put(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final LinkedBlockingQueue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(LinkedBlockingQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + LinkedBlockingQueue consumer = this.queue; + Object result = null; + int i = this.repetitions; + + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java index 6ac1775..224607a 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedBlockingQueue_NonBlock.java @@ -1,57 +1,54 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package dorkbox.util.messagebus; import java.util.concurrent.LinkedBlockingQueue; public class PerfTest_LinkedBlockingQueue_NonBlock { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 2; + private static final int concurrency = 4; public static void main(final String[] args) throws Exception { - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17); + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + final int warmupRuns = 5; + final int runs = 5; + + long average = 0; + + final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); + average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); } - private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception { + public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { Producer[] producers = new Producer[concurrency]; Consumer[] consumers = new Consumer[concurrency]; Thread[] threads = new Thread[concurrency*2]; for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; - String qName = queue.getClass().getSimpleName(); - - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - return ops; - } - - public static class Producer implements Runnable { - private final LinkedTransferQueue queue; - volatile long start; - - public Producer(LinkedTransferQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - LinkedTransferQueue producer = this.queue; - int i = REPETITIONS; - this.start = System.nanoTime(); - - try { - do { - producer.transfer(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public static class Consumer implements Runnable { - private final LinkedTransferQueue queue; - Object result; - volatile long end; - - public Consumer(LinkedTransferQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - LinkedTransferQueue consumer = this.queue; - Object result = null; - int i = REPETITIONS; - - try { - do { - result = consumer.take(); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - this.result = result; - this.end = System.nanoTime(); + for (int concurrency = 1; concurrency < 5; concurrency++) { + final LinkedTransferQueue queue = new LinkedTransferQueue(); + long average = PerfTest_LinkedTransferQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); + System.out.format("PerfTest_LinkedTransferQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency); } } } diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_Block.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_Block.java new file mode 100644 index 0000000..cc11223 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_Block.java @@ -0,0 +1,149 @@ +package dorkbox.util.messagebus; + +import java.util.concurrent.LinkedTransferQueue; + +public class PerfTest_LinkedTransferQueue_Block { + public static final int REPETITIONS = 50 * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + private static final int concurrency = 4; + + public static void main(final String[] args) throws Exception { + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); + + final int warmupRuns = 4; + final int runs = 5; + + final LinkedTransferQueue queue = new LinkedTransferQueue(); + long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); + + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); + } + + public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = repetitions * 1_000_000_000L / duration; + String qName = queue.getClass().getSimpleName(); + + if (showStats) { + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + } + return ops; + } + + public static class Producer implements Runnable { + private final LinkedTransferQueue queue; + volatile long start; + private int repetitions; + + public Producer(LinkedTransferQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + LinkedTransferQueue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + + try { + do { + producer.transfer(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + } + } + + public static class Consumer implements Runnable { + private final LinkedTransferQueue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(LinkedTransferQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + LinkedTransferQueue consumer = this.queue; + Object result = null; + int i = this.repetitions; + + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java index 4e46ffd..76af1cd 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_LinkedTransferQueue_NonBlock.java @@ -1,54 +1,54 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ package dorkbox.util.messagebus; import java.util.concurrent.LinkedTransferQueue; public class PerfTest_LinkedTransferQueue_NonBlock { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + public static final int QUEUE_CAPACITY = 1 << 17; private static final int concurrency = 4; public static void main(final String[] args) throws Exception { - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final LinkedTransferQueue queue = new LinkedTransferQueue(); + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + final int warmupRuns = 5; + final int runs = 5; + + long average = 0; + + final LinkedTransferQueue queue = new LinkedTransferQueue(); + average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); } - private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { + public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { Producer[] producers = new Producer[concurrency]; Consumer[] consumers = new Consumer[concurrency]; Thread[] threads = new Thread[concurrency*2]; for (int i=0;i average) { -// average = newAverage; -// System.err.println("Best value: " + i + " : " + newAverage); -// } -// } - - - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); - } - - private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception { - int runs = warmUpRuns + sumCount; - final long[] results = new long[runs]; - for (int i = 0; i < runs; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last X results for summary - long sum = 0; - for (int i = warmUpRuns; i < runs; i++) { - sum += results[i]; + for (int concurrency = 1; concurrency < 5; concurrency++) { + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency); + long average = PerfTest_MpmcTransferArrayQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); + System.out.format("PerfTest_MpmcTransferArrayQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency); } - return sum/sumCount; - } - - private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency*2]; - - for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = REPETITIONS * 1_000_000_000L / duration; - String qName = queue.getClass().getSimpleName(); - - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - return ops; - } - - public static class Producer implements Runnable { - private final MpmcTransferArrayQueue queue; - volatile long start; - - public Producer(MpmcTransferArrayQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - MpmcTransferArrayQueue producer = this.queue; - int i = REPETITIONS; - this.start = System.nanoTime(); - - do { - producer.transfer(TEST_VALUE); - } while (0 != --i); - } - } - - public static class Consumer implements Runnable { - private final MpmcTransferArrayQueue queue; - Object result; - volatile long end; - - public Consumer(MpmcTransferArrayQueue queue) { - this.queue = queue; - } - - @Override - public void run() { - MpmcTransferArrayQueue consumer = this.queue; - Object result = null; - int i = REPETITIONS; - - Node node = new Node(); - do { - result = consumer.take(node); - } while (0 != --i); - - this.result = result; - this.end = System.nanoTime(); + for (int concurrency = 1; concurrency < 5; concurrency++) { + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency); + long average = PerfTest_MpmcTransferArrayQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); + System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency); } } } diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_Block.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_Block.java new file mode 100644 index 0000000..3680ad6 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_Block.java @@ -0,0 +1,181 @@ +package dorkbox.util.messagebus; + +import org.openjdk.jol.info.ClassLayout; +import org.openjdk.jol.util.VMSupport; + +import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue; +import dorkbox.util.messagebus.common.simpleq.Node; + +public class PerfTest_MpmcTransferArrayQueue_Block { +// static { +// System.setProperty("sparse.shift", "2"); +// } + + public static final int REPETITIONS = 50 * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + private static final int concurrency = 4; + + public static void main(final String[] args) throws Exception { + + System.out.println(VMSupport.vmDetails()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); + + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); + + final int warmupRuns = 4; + final int runs = 5; + + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency); + long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); + +// SimpleQueue.INPROGRESS_SPINS = 64; +// SimpleQueue.POP_SPINS = 512; +// SimpleQueue.PUSH_SPINS = 512; +// SimpleQueue.PARK_SPINS = 512; +// +// for (int i = 128; i< 10000;i++) { +// int full = 2*i; +// +// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); +// SimpleQueue.PARK_SPINS = full; +// +// +// long newAverage = averageRun(warmupRuns, runs, queue); +// if (newAverage > average) { +// average = newAverage; +// System.err.println("Best value: " + i + " : " + newAverage); +// } +// } + + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); + } + + public static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = repetitions * 1_000_000_000L / duration; + String qName = queue.getClass().getSimpleName(); + + if (showStats) { + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + } + return ops; + } + + public static class Producer implements Runnable { + private final MpmcTransferArrayQueue queue; + volatile long start; + private int repetitions; + + public Producer(MpmcTransferArrayQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + MpmcTransferArrayQueue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + + try { + do { + producer.transfer(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + } + } + + public static class Consumer implements Runnable { + private final MpmcTransferArrayQueue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(MpmcTransferArrayQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + @Override + public void run() { + MpmcTransferArrayQueue consumer = this.queue; + Object result = null; + int i = this.repetitions; + + Node node = new Node(); + try { + do { + result = consumer.take(node); + } while (0 != --i); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java index 1ed67a1..3f31a72 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MpmcTransferArrayQueue_NonBlock.java @@ -1,23 +1,25 @@ package dorkbox.util.messagebus; -import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue; +import dorkbox.util.messagebus.common.simpleq.MpmcTransferArrayQueue; public class PerfTest_MpmcTransferArrayQueue_NonBlock { - public static final int REPETITIONS = 50 * 1000 * 1000; + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 1; + private static final int concurrency = 4; public static void main(final String[] args) throws Exception { + System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); + final int warmupRuns = 5; final int runs = 5; long average = 0; - final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency, QUEUE_CAPACITY); - average = averageRun(warmupRuns, runs, queue); + final MpmcTransferArrayQueue queue = new MpmcTransferArrayQueue(concurrency); + average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); // SimpleQueue.INPROGRESS_SPINS = 64; // SimpleQueue.POP_SPINS = 512; @@ -42,12 +44,12 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock { System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); } - private static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue) throws Exception { + public static long averageRun(int warmUpRuns, int sumCount, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { int runs = warmUpRuns + sumCount; final long[] results = new long[runs]; for (int i = 0; i < runs; i++) { System.gc(); - results[i] = performanceRun(i, queue); + results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); } // only average last X results for summary long sum = 0; @@ -58,15 +60,15 @@ public class PerfTest_MpmcTransferArrayQueue_NonBlock { return sum/sumCount; } - private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue) throws Exception { + private static long performanceRun(int runNumber, MpmcTransferArrayQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { Producer[] producers = new Producer[concurrency]; Consumer[] consumers = new Consumer[concurrency]; Thread[] threads = new Thread[concurrency*2]; for (int i=0;i