From 2705daa07319f50f5045c56fff0ae29246fcbc14 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 3 May 2015 22:43:35 +0200 Subject: [PATCH] much better results. almost same as LTQ, but with no heap objects --- .../util/messagebus/MultiMBassador.java | 23 +- .../util/messagebus/common/simpleq/Node.java | 2 +- .../jctools/MpmcArrayTransferQueue.java | 114 +---- .../common/simpleq/jctools/SimpleQueue.java | 482 ++++++++++++------ .../LinkTransferQueueConcurrentPerfTest.java | 4 +- .../messagebus/SimpleQueueAltPerfTest.java | 6 +- 6 files changed, 346 insertions(+), 285 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index e93f086..4d44bf5 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -7,11 +7,10 @@ import java.util.concurrent.TimeUnit; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.ISetEntry; -import dorkbox.util.messagebus.common.LinkedTransferQueue; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; -import dorkbox.util.messagebus.common.TransferQueue; import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; +import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -42,8 +41,8 @@ public class MultiMBassador implements IMessageBus { // this handler will receive all errors that occur during message dispatch or message handling private final Collection errorHandlers = new ArrayDeque(); - private final TransferQueue dispatchQueue; -// private final SimpleQueue dispatchQueue; +// private final TransferQueue dispatchQueue; + private final SimpleQueue dispatchQueue; private final SubscriptionManager subscriptionManager; @@ -88,8 +87,8 @@ public class MultiMBassador implements IMessageBus { this.numberOfThreads = numberOfThreads; - this.dispatchQueue = new LinkedTransferQueue(); -// this.dispatchQueue = new SimpleQueue(numberOfThreads); +// this.dispatchQueue = new LinkedTransferQueue(); + this.dispatchQueue = new SimpleQueue(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.threads = new ArrayDeque(numberOfThreads); @@ -100,8 +99,8 @@ public class MultiMBassador implements IMessageBus { Runnable runnable = new Runnable() { @Override public void run() { -// SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; - TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; + SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; +// TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; Object message1; try { @@ -150,8 +149,8 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { - return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads; -// return this.dispatchQueue.hasPendingMessages(); +// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads; + return this.dispatchQueue.hasPendingMessages(); } @Override @@ -422,8 +421,8 @@ public class MultiMBassador implements IMessageBus { }; try { - this.dispatchQueue.transfer(runnable); -// this.dispatchQueue.put(message); +// this.dispatchQueue.transfer(runnable); + this.dispatchQueue.put(message); } catch (InterruptedException e) { handlePublicationError(new PublicationError() .setMessage("Error while adding an asynchronous message") diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 204e617..a6422f6 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -7,7 +7,7 @@ package dorkbox.util.messagebus.common.simpleq; abstract class PrePad { // volatile long y0, y1, y2, y4, y5, y6 = 7L; -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class ColdItems extends PrePad { diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java index 629dfcd..c28d439 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java @@ -1,7 +1,6 @@ package dorkbox.util.messagebus.common.simpleq.jctools; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.locks.LockSupport; import dorkbox.util.messagebus.common.simpleq.Node; @@ -35,67 +34,6 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField= producerIndex && // test against cached pIndex -// consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must -// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// return null; - } - - // another consumer beat us and moved sequence ahead, retry 2 - return null; - } - public Object take(final boolean timed, final long nanos) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; @@ -494,18 +393,19 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField 0) { - --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) { - LockSupport.parkNanos(1); // occasionally yield +// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); + break; } + --spins; } else { break; } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java index 5cf4551..2b8258c 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java @@ -2,11 +2,12 @@ package dorkbox.util.messagebus.common.simpleq.jctools; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import dorkbox.util.messagebus.common.simpleq.Node; -public final class SimpleQueue { +public final class SimpleQueue extends MpmcArrayQueueConsumerField { public static final int TYPE_EMPTY = 0; public static final int TYPE_CONSUMER = 1; public static final int TYPE_PRODUCER = 2; @@ -46,10 +47,6 @@ public final class SimpleQueue { UNSAFE.putInt(node, TYPE, type); } - private static final void soType(Object node, int type) { - UNSAFE.putOrderedInt(node, TYPE, type); - } - private static final int lpType(Object node) { return UNSAFE.getInt(node, TYPE); } @@ -70,9 +67,27 @@ public final class SimpleQueue { return UNSAFE.getObjectVolatile(node, THREAD); } - private static final boolean casThread(Object node, Object expect, Object newValue) { - return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue); - } + /** The number of CPUs */ + private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; + + /** + * The number of times to spin (with randomly interspersed calls + * to Thread.yield) on multiprocessor before blocking when a node + * is apparently the first waiter in the queue. See above for + * explanation. Must be a power of two. The value is empirically + * derived -- it works pretty well across a variety of processors, + * numbers of CPUs, and OSes. + */ + private static final int FRONT_SPINS = 1 << 7; + + /** + * The number of times to spin before blocking when a node is + * preceded by another node that is apparently spinning. Also + * serves as an increment to FRONT_SPINS on phase changes, and as + * base average frequency for yielding during spins. Must be a + * power of two. + */ + private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** The number of CPUs */ @@ -85,7 +100,7 @@ public final class SimpleQueue { * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. * The value here is approximately the average of those across a range of tested systems. */ - private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000 + private static final int SPINS = MP ? 0 : 512; // orig: 2000 /** * The number of times to spin before blocking in timed waits. @@ -101,7 +116,7 @@ public final class SimpleQueue { * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. */ - static final int maxUntimedSpins = maxTimedSpins * 32; + static final int maxUntimedSpins = maxTimedSpins * 16; static final int negMaxUntimedSpins = -maxUntimedSpins; /** @@ -109,201 +124,255 @@ public final class SimpleQueue { * rather than to use timed park. A rough estimate suffices. */ static final long spinForTimeoutThreshold = 1000L; - - private MpmcArrayTransferQueue queue; - private MpmcArrayTransferQueue pool; + private int size; public SimpleQueue(final int size) { - int roundToPowerOfTwo = Pow2.roundToPowerOfTwo(size); - - this.queue = new MpmcArrayTransferQueue(roundToPowerOfTwo); - this.pool = new MpmcArrayTransferQueue(roundToPowerOfTwo); - - for (int i=0;i nodeThreadLocal = new ThreadLocal() { + @Override + protected Object initialValue() { + return new Node(); + } + }; + + /** * PRODUCER */ public void put(Object item) throws InterruptedException { - final MpmcArrayTransferQueue queue = this.queue; - final MpmcArrayTransferQueue pool = this.pool; - final Thread myThread = Thread.currentThread(); - Object node = null; // local load of field to avoid repeated loads after volatile reads - final long mask = queue.mask; - final long[] sBuffer = queue.sequenceBuffer; + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; - long cSeqOffset; - - long currConsumerIndex; - long currProducerIndex; + long consumerIndex; + long producerIndex; int lastType; while (true) { - currConsumerIndex = queue.lvConsumerIndex(); - currProducerIndex = queue.lvProducerIndex(); + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); - if (currConsumerIndex == currProducerIndex) { + final Object previousElement; + if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; + previousElement = null; } else { - cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask); - final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (currConsumerIndex + 1); - - if (delta == 0) { - final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex)); - if (lpElementNoCast == null) { - continue; - } - - lastType = lpType(lpElementNoCast); - } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin_InProgress(); continue; } + + lastType = lpType(previousElement); } switch (lastType) { case TYPE_EMPTY: case TYPE_PRODUCER: { // empty or same mode = push+park onto queue - if (node == null) { - node = pool.take(false, 0); + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; - spType(node, TYPE_PRODUCER); - spThread(node, myThread); - spItem1(node, item); + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_PRODUCER); + spThread(node, myThread); + spItem1(node, item); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + park(node, myThread, false, 0); + return; + } } - if (!queue.putExact(currProducerIndex, node, false, 0)) { - // whoops, inconsistent state -// busySpin2(); - continue; - } - park(node, myThread, false, 0); - - return; + // whoops, inconsistent state + busySpin_pushConflict(); + continue; } case TYPE_CONSUMER: { - // complimentary mode = pop+unpark off queue - if (node != null) { - pool.put(node, false, 0); - } + // complimentary mode = pop+unpark off queue + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); - node = queue.takeExact(currConsumerIndex, false, 0); - if (node == null) { - // whoops, inconsistent state -// busySpin2(); - continue; - } + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier - soItem1(node, item); - unpark(node); + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); - pool.put(node, false, 0); - return; + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + soItem1(e, item); + unpark(e); + + return; + } + } + + // whoops, inconsistent state + busySpin_popConflict(); + continue; } } } } - /** * CONSUMER */ public Object take() throws InterruptedException { - final MpmcArrayTransferQueue queue = this.queue; - final MpmcArrayTransferQueue pool = this.pool; - final Thread myThread = Thread.currentThread(); - Object node = null; - - // local load of field to avoid repeated loads after volatile reads - final long mask = queue.mask; - final long[] sBuffer = queue.sequenceBuffer; + final long mask = this.mask; + final long[] sBuffer = this.sequenceBuffer; - long cSeqOffset; - long currConsumerIndex; - long currProducerIndex; + long consumerIndex; + long producerIndex; int lastType; while (true) { - currConsumerIndex = queue.lvConsumerIndex(); - currProducerIndex = queue.lvProducerIndex(); + consumerIndex = lvConsumerIndex(); + producerIndex = lvProducerIndex(); - if (currConsumerIndex == currProducerIndex) { + final Object previousElement; + if (consumerIndex == producerIndex) { lastType = TYPE_EMPTY; + previousElement = null; } else { - cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask); - final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (currConsumerIndex + 1); - - if (delta == 0) { - final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex)); - if (lpElementNoCast == null) { - continue; - } - - lastType = lpType(lpElementNoCast); - } else { + previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement == null) { + // the last producer hasn't finished setting the object yet + busySpin_InProgress(); continue; } + + lastType = lpType(previousElement); } switch (lastType) { case TYPE_EMPTY: - case TYPE_CONSUMER: - { + case TYPE_CONSUMER: { // empty or same mode = push+park onto queue - if (node == null) { - node = pool.take(false, 0); + long pSeqOffset = calcSequenceOffset(producerIndex, mask); + final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + final long delta = seq - producerIndex; - spType(node, TYPE_CONSUMER); - spThread(node, myThread); + if (delta == 0) { + // this is expected if we see this first time around + if (casProducerIndex(producerIndex, producerIndex + 1)) { + // Successful CAS: full barrier + + final Thread myThread = Thread.currentThread(); + final Object node = nodeThreadLocal.get(); + + spType(node, TYPE_CONSUMER); + spThread(node, myThread); + + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(producerIndex, mask); + spElement(offset, node); + + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore + + park(node, myThread, false, 0); + Object item1 = lvItem1(node); + + return item1; + } } - if (!queue.putExact(currProducerIndex, node, false, 0)) { - // whoops, inconsistent state -// busySpin2(); - continue; - } - park(node, myThread, false, 0); - - Object item1 = lvItem1(node); - return item1; + // whoops, inconsistent state + busySpin_pushConflict(); + continue; } case TYPE_PRODUCER: { // complimentary mode = pop+unpark off queue - if (node != null) { - pool.put(node, false, 0); + long cSeqOffset = calcSequenceOffset(consumerIndex, mask); + final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long delta = seq - (consumerIndex + 1); + + if (delta == 0) { + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) { + // Successful CAS: full barrier + + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(consumerIndex, mask); + final Object e = lpElementNoCast(offset); + spElement(offset, null); + + // Move sequence ahead by capacity, preparing it for next offer + // (seeing this value from a consumer will lead to retry 2) + soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore + + final Object lvItem1 = lpItem1(e); + unpark(e); + + return lvItem1; + } } - node = queue.takeExact(currConsumerIndex, false, 0); - if (node == null) { - // whoops, inconsistent state -// busySpin2(); - continue; - } - - final Object lvItem1 = lpItem1(node); - unpark(node); - - pool.put(node, false, 0); - return lvItem1; + // whoops, inconsistent state + busySpin_popConflict(); + continue; } } } } - private static final void busySpin2() { + /** + * Spin for when the current thread is waiting for the item to be set. The producer index has incremented, but the + * item isn't present yet. + * @param random + */ + private static final void busySpin_InProgress() { +// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); +// +// if (randomYields.nextInt(1) != 0) { +////// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); +////// break; +// } + // busy spin for the amount of time (roughly) of a CPU context switch - int spins = maxUntimedSpins; + int spins = 128; for (;;) { if (spins > 0) { +// if (randomYields.nextInt(CHAINED_SPINS) == 0) { +//// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); +//// break; +// } --spins; } else { break; @@ -311,12 +380,75 @@ public final class SimpleQueue { } } - private static final void busySpin() { + private static final void busySpin_pushConflict() { +// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); + // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; + int spins = 256; + for (;;) { + if (spins > 0) { +// if (randomYields.nextInt(1) != 0) { +////// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); +//// break; +// } + --spins; + } else { + break; + } + } + } + + private static final void busySpin_popConflict() { +// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); + + // busy spin for the amount of time (roughly) of a CPU context switch + int spins = 64; + for (;;) { + if (spins > 0) { +// if (randomYields.nextInt(1) != 0) { +////// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); +//// break; +// } + --spins; + } else { + break; + } + } + } + + private static final void busySpin2() { + ThreadLocalRandom randomYields = ThreadLocalRandom.current(); + + // busy spin for the amount of time (roughly) of a CPU context switch + int spins = 64; + for (;;) { + if (spins > 0) { + if (randomYields.nextInt(1) != 0) { +//// LockSupport.parkNanos(1); // occasionally yield + Thread.yield(); +// break; + } + --spins; + } else { + break; + } + } + } + + private static final void busySpin(ThreadLocalRandom random) { + // busy spin for the amount of time (roughly) of a CPU context switch +// int spins = spinsFor(); + int spins = 128; for (;;) { if (spins > 0) { --spins; + if (random.nextInt(CHAINED_SPINS) == 0) { +//// LockSupport.parkNanos(1); // occasionally yield +// Thread.yield(); + break; + } } else { break; } @@ -335,7 +467,19 @@ public final class SimpleQueue { public boolean hasPendingMessages() { // count the number of consumers waiting, it should be the same as the number of threads configured // return this.consumersWaiting.size() == this.numberConsumerThreads; - return false; +// return false; + + long consumerIndex = lvConsumerIndex(); + long producerIndex = lvProducerIndex(); + + if (consumerIndex != producerIndex) { + final Object previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); + if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.size == producerIndex) { + return false; + } + } + + return true; } public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { @@ -343,12 +487,13 @@ public final class SimpleQueue { } public final void park(final Object node, final Thread myThread, final boolean timed, final long nanos) throws InterruptedException { -// if (casThread(node, null, myThread)) { - // we won against the other thread + ThreadLocalRandom randomYields = null; // bound if needed + // long lastTime = timed ? System.nanoTime() : 0; // int spins = timed ? maxTimedSpins : maxUntimedSpins; - int spins = maxTimedSpins; +// int spins = maxTimedSpins; + int spins = 51200; // if (timed) { // long now = System.nanoTime(); @@ -360,36 +505,53 @@ public final class SimpleQueue { // } // } - // busy spin for the amount of time (roughly) of a CPU context switch - // then park (if necessary) - for (;;) { - if (lpThread(node) == null) { - return; - } else if (spins > 0) { - --spins; -// } else if (spins > negMaxUntimedSpins) { -// --spins; -// UNSAFE.park(false, 1L); - } else { - // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. - UNSAFE.park(false, 0L); - - if (myThread.isInterrupted()) { -// casThread(node, myThread, null); - Thread.interrupted(); - throw new InterruptedException(); - } - } + for (;;) { + if (lvThread(node) == null) { + return; + } else if (spins > 0) { +// if (randomYields == null) { +// randomYields = ThreadLocalRandom.current(); +// } else if (randomYields.nextInt(spins) == 0) { +// Thread.yield(); // occasionally yield +// } + --spins; + } else if (myThread.isInterrupted()) { + Thread.interrupted(); + throw new InterruptedException(); + } else { + // park can return for NO REASON (must check for thread values) + UNSAFE.park(false, 0L); } -// } + } } public void unpark(Object node) { final Object thread = lpThread(node); soThread(node, null); UNSAFE.unpark(thread); + } -// if (thread != null && casThread(node, thread, Thread.currentThread())) { -// } + @Override + public boolean offer(Node message) { + // TODO Auto-generated method stub + return false; + } + + @Override + public Node poll() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Node peek() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int size() { + // TODO Auto-generated method stub + return 0; } } diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java index e50088e..244d594 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java @@ -20,12 +20,12 @@ import dorkbox.util.messagebus.common.simpleq.Node; public class LinkTransferQueueConcurrentPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 2; + private static final int concurrency = 8; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java index 69bdda0..9de2822 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java @@ -20,12 +20,12 @@ import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; public class SimpleQueueAltPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 2; + private static final int concurrency = 8; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); @@ -88,7 +88,7 @@ public class SimpleQueueAltPerfTest { long duration = end - start; - long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + long ops = REPETITIONS * 1_000_000_000L / duration; String qName = queue.getClass().getSimpleName(); System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);