WIP, crap multiprocessor performance with simplequeue abstraction, 160k/s

This commit is contained in:
nathan 2015-05-03 15:55:26 +02:00
parent 2e1feacc0e
commit 8c01959080
9 changed files with 770 additions and 231 deletions

View File

@ -2,6 +2,7 @@ package dorkbox.util.messagebus.common.simpleq;
// mpmc sparse.shift = 2, for this to be fast.
abstract class PrePad {
@ -14,7 +15,8 @@ abstract class ColdItems extends PrePad {
// public final int ID = count.getAndIncrement();
public int type = 0;
// public short type = MessageType.ONE;
// public short messageType = MessageType.ONE;
public Object item1 = null;
// public Object item2 = null;
// public Object item3 = null;
@ -31,11 +33,11 @@ abstract class HotItem1 extends Pad0 {
abstract class Pad1 extends HotItem1 {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem2 extends Pad1 {
public transient volatile Thread thread;
public Thread thread;
}
abstract class Pad2 extends HotItem2 {
@ -50,7 +52,7 @@ abstract class HotItem3 extends Pad2 {
public class Node extends HotItem3 {
// post-padding
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() {
}

View File

@ -1,8 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
public class NodeState {
public static final short FREE = 0;
public static final short CANCELLED = 1;
private NodeState() {}
}

View File

@ -1,7 +1,5 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
@ -9,24 +7,6 @@ import dorkbox.util.messagebus.common.simpleq.Node;
public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<Node> {
public static final int TYPE_EMPTY = 0;
public static final int TYPE_CONSUMER = 1;
public static final int TYPE_PRODUCER = 2;
private static final long TYPE;
static {
try {
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private static final int lpType(Object node) {
return UNSAFE.getInt(node, TYPE);
}
/** The number of CPUs */
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
@ -76,56 +56,46 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
* @param nanos
* @return the offset that the item was placed into
*/
public boolean putIfEmpty(final Object item, final boolean timed, final long nanos) {
public boolean putExact(long producerIndex, final Object item, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
// final long capacity = mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long producerIndex;
long pSeqOffset;
long consumerIndex;
// long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
// consumer has to be first
consumerIndex = lvConsumerIndex(); // LoadLoad
producerIndex = lvProducerIndex(); // LoadLoad
if (consumerIndex != producerIndex) {
return false;
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
return true;
}
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
return true;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
}
// another producer has moved the sequence by one, retry 2
busySpin();
// failed cas, retry 1
// } else if (delta < 0 && // poll has not moved this value forward
// producerIndex - capacity <= consumerIndex && // test against cached cIndex
// producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
//
// // Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
}
return false;
}
/**
@ -181,6 +151,45 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
}
}
public Object takeExact(long consumerIndex, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long cSeqOffset;
// long producerIndex = -1; // start with bogus value, hope we don't need it
cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
return e;
}
// failed cas, retry 1
// } else if (delta < 0 && // slot has not been moved by producer
// consumerIndex >= producerIndex && // test against cached pIndex
// consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
}
// another consumer beat us and moved sequence ahead, retry 2
return null;
}
public Object take(final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
@ -213,15 +222,15 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// return null;
busySpin(); // empty, so busy spin
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer will busy spin
busySpin();
}
}
@ -541,26 +550,26 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
return null;
}
public int peekLast() {
long currConsumerIndex;
long currProducerIndex;
while (true) {
currConsumerIndex = lvConsumerIndex();
currProducerIndex = lvProducerIndex();
if (currConsumerIndex == currProducerIndex) {
return TYPE_EMPTY;
}
final Object lpElementNoCast = lpElementNoCast(calcElementOffset(currConsumerIndex));
if (lpElementNoCast == null) {
continue;
}
return lpType(lpElementNoCast);
}
}
// public int peekLast() {
// long currConsumerIndex;
// long currProducerIndex;
//
// while (true) {
// currConsumerIndex = lvConsumerIndex();
// currProducerIndex = lvProducerIndex();
//
// if (currConsumerIndex == currProducerIndex) {
// return TYPE_EMPTY;
// }
//
// final Object lpElementNoCast = lpElementNoCast(calcElementOffset(currConsumerIndex));
// if (lpElementNoCast == null) {
// continue;
// }
//
// return lpType(lpElementNoCast);
// }
// }
@Override
public int size() {

View File

@ -1,12 +1,10 @@
package dorkbox.util.messagebus.common.simpleq;
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
import dorkbox.util.messagebus.common.simpleq.Node;
public final class SimpleQueue {
public static final int TYPE_EMPTY = 0;
@ -133,56 +131,81 @@ public final class SimpleQueue {
final MpmcArrayTransferQueue queue = this.queue;
final MpmcArrayTransferQueue pool = this.pool;
final Thread myThread = Thread.currentThread();
Object node = null;
// local load of field to avoid repeated loads after volatile reads
final long mask = queue.mask;
final long[] sBuffer = queue.sequenceBuffer;
long cSeqOffset;
long currConsumerIndex;
long currProducerIndex;
int lastType;
while (true) {
currConsumerIndex = queue.lvConsumerIndex();
currProducerIndex = queue.lvProducerIndex();
int lastType = queue.peekLast();
if (currConsumerIndex == currProducerIndex) {
lastType = TYPE_EMPTY;
} else {
cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask);
final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currConsumerIndex + 1);
if (delta == 0) {
final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex));
if (lpElementNoCast == null) {
continue;
}
lastType = lpType(lpElementNoCast);
} else {
continue;
}
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_CONSUMER:
// case TYPE_EMPTY: {
// // empty = push+park onto queue
// final Object node = pool.take(false, 0);
//
// final Thread myThread = Thread.currentThread();
// spType(node, TYPE_PRODUCER);
// spThread(node, myThread);
// spItem1(node, item);
//
//// queue.put(node, false, 0);
// if (!queue.putIfEmpty(node, false, 0)) {
// pool.put(node, false, 0);
// continue;
// }
// park(node, myThread, false, 0);
//
// return;
// }
case TYPE_PRODUCER: {
// same mode = push+park onto queue
final Object node = pool.take(false, 0);
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
// empty or same mode = push+park onto queue
if (node == null) {
node = pool.take(false, 0);
queue.put(node, false, 0);
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
}
if (!queue.putExact(currProducerIndex, node, false, 0)) {
// whoops, inconsistent state
// busySpin2();
continue;
}
park(node, myThread, false, 0);
return;
}
// case TYPE_CONSUMER: {
// // complimentary mode = unpark+pop off queue
// final Object node = queue.take(false, 0);
//
// final Object thread = lpThread(node);
// spItem1(node, item);
// soThread(node, null);
//
// pool.put(node, false, 0);
// unpark(thread);
// return;
// }
case TYPE_CONSUMER: {
// complimentary mode = pop+unpark off queue
if (node != null) {
pool.put(node, false, 0);
}
node = queue.takeExact(currConsumerIndex, false, 0);
if (node == null) {
// whoops, inconsistent state
// busySpin2();
continue;
}
soItem1(node, item);
unpark(node);
pool.put(node, false, 0);
return;
}
}
}
}
@ -195,54 +218,79 @@ public final class SimpleQueue {
final MpmcArrayTransferQueue queue = this.queue;
final MpmcArrayTransferQueue pool = this.pool;
final Thread myThread = Thread.currentThread();
Object node = null;
// local load of field to avoid repeated loads after volatile reads
final long mask = queue.mask;
final long[] sBuffer = queue.sequenceBuffer;
long cSeqOffset;
long currConsumerIndex;
long currProducerIndex;
int lastType;
while (true) {
int lastType = queue.peekLast();
currConsumerIndex = queue.lvConsumerIndex();
currProducerIndex = queue.lvProducerIndex();
if (currConsumerIndex == currProducerIndex) {
lastType = TYPE_EMPTY;
} else {
cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask);
final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currConsumerIndex + 1);
if (delta == 0) {
final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex));
if (lpElementNoCast == null) {
continue;
}
lastType = lpType(lpElementNoCast);
} else {
continue;
}
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_CONSUMER:
// case TYPE_EMPTY: {
// // empty = push+park onto queue
// final Object node = pool.take(false, 0);
//
// final Thread myThread = Thread.currentThread();
// spType(node, TYPE_CONSUMER);
// spThread(node, myThread);
//
//// queue.put(node, false, 0);
// if (!queue.putIfEmpty(node, false, 0)) {
// pool.put(node, false, 0);
// continue;
// }
// park(node, myThread, false, 0);
//
// Object lpItem1 = lpItem1(node);
// return lpItem1;
// }
// case TYPE_CONSUMER: {
// // same mode = push+park onto queue
// final Object node = pool.take(false, 0);
//
// final Thread myThread = Thread.currentThread();
// spType(node, TYPE_PRODUCER);
// spThread(node, myThread);
//
// queue.put(node, false, 0);
// park(node, myThread, false, 0);
//
// Object lpItem1 = lpItem1(node);
// return lpItem1;
// }
{
// empty or same mode = push+park onto queue
if (node == null) {
node = pool.take(false, 0);
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
}
if (!queue.putExact(currProducerIndex, node, false, 0)) {
// whoops, inconsistent state
// busySpin2();
continue;
}
park(node, myThread, false, 0);
Object item1 = lvItem1(node);
return item1;
}
case TYPE_PRODUCER: {
// complimentary mode = unpark+pop off queue
final Object node = queue.take(false, 0);
// complimentary mode = pop+unpark off queue
if (node != null) {
pool.put(node, false, 0);
}
node = queue.takeExact(currConsumerIndex, false, 0);
if (node == null) {
// whoops, inconsistent state
// busySpin2();
continue;
}
final Object thread = lpThread(node);
final Object lvItem1 = lpItem1(node);
soThread(node, null);
unpark(node, thread);
unpark(node);
pool.put(node, false, 0);
return lvItem1;
@ -295,12 +343,12 @@ public final class SimpleQueue {
}
public final void park(final Object node, final Thread myThread, final boolean timed, final long nanos) throws InterruptedException {
if (casThread(node, null, myThread)) {
// if (casThread(node, null, myThread)) {
// we won against the other thread
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = SPINS;
int spins = maxTimedSpins;
// if (timed) {
// long now = System.nanoTime();
@ -315,31 +363,33 @@ public final class SimpleQueue {
// busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary)
for (;;) {
if (lvThread(node) != myThread) {
if (lpThread(node) == null) {
return;
} else if (spins > 0) {
--spins;
//// } else if (spins > negMaxUntimedSpins) {
//// --spins;
//// LockSupport.parkNanos(1);
// } else if (spins > negMaxUntimedSpins) {
// --spins;
// UNSAFE.park(false, 1L);
} else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
UNSAFE.park(false, 0L);
// if (myThread.isInterrupted()) {
if (myThread.isInterrupted()) {
// casThread(node, myThread, null);
// Thread.interrupted();
// throw new InterruptedException();
// }
Thread.interrupted();
throw new InterruptedException();
}
}
}
}
// }
}
public void unpark(Object node, Object thread) {
if (casThread(node, thread, Thread.currentThread())) {
} else {
UNSAFE.unpark(thread);
}
public void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null);
UNSAFE.unpark(thread);
// if (thread != null && casThread(node, thread, Thread.currentThread())) {
// }
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.LinkedTransferQueue;
import dorkbox.util.messagebus.common.simpleq.Node;
public class LinkTransferQueueConcurrentPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedTransferQueue queue = new LinkedTransferQueue();
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue queue;
volatile long start;
public Producer(LinkedTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedTransferQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class Consumer implements Runnable {
private final LinkedTransferQueue queue;
Object result;
volatile long end;
public Consumer(LinkedTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedTransferQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -24,7 +24,7 @@ import dorkbox.util.messagebus.common.simpleq.Node;
public class LinkTransferQueuePerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
@ -53,13 +53,11 @@ public class LinkTransferQueuePerfTest {
private static long performanceRun(int runNumber, LinkedTransferQueue<Integer> queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
// }
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
LinkedTransferQueue<Integer> consumer = queue;
LinkedTransferQueue<Integer> consumer = queue;
Object result;
int i = REPETITIONS;
int queueEmpty = 0;

View File

@ -0,0 +1,144 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue;
public class MpmcQueueAltConcurrentPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final MpmcArrayTransferQueue queue = new MpmcArrayTransferQueue(1 << 17);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayTransferQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final MpmcArrayTransferQueue queue;
volatile long start;
public Producer(MpmcArrayTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayTransferQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
do {
producer.put(TEST_VALUE, false, 0);
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final MpmcArrayTransferQueue queue;
Object result;
volatile long end;
public Consumer(MpmcArrayTransferQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayTransferQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
do {
result = consumer.take(false, 0);
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -0,0 +1,144 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueConcurrentPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final MpmcArrayQueue queue = new MpmcArrayQueue(1 << 17);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final MpmcArrayQueue queue;
volatile long start;
public Producer(MpmcArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
do {
producer.offer(TEST_VALUE);
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final MpmcArrayQueue queue;
Object result;
volatile long end;
public Consumer(MpmcArrayQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
do {
result = consumer.poll();
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,17 +1,14 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus;
@ -19,21 +16,23 @@ import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue;
public class SimpleQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final SimpleQueue queue = new SimpleQueue(1<<17);
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -49,31 +48,56 @@ public class SimpleQueueAltPerfTest {
}
private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
SimpleQueue consumer = queue;
Object result;
int i = REPETITIONS;
do {
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
thread.join();
long duration = end - p.start;
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result);
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final SimpleQueue queue;
int queueFull = 0;
long start;
volatile long start;
public Producer(SimpleQueue queue) {
this.queue = queue;
@ -83,7 +107,7 @@ public class SimpleQueueAltPerfTest {
public void run() {
SimpleQueue producer = this.queue;
int i = REPETITIONS;
long s = System.nanoTime();
this.start = System.nanoTime();
try {
do {
@ -92,7 +116,34 @@ public class SimpleQueueAltPerfTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
this.start = s;
}
}
public static class Consumer implements Runnable {
private final SimpleQueue queue;
Object result;
volatile long end;
public Consumer(SimpleQueue queue) {
this.queue = queue;
}
@Override
public void run() {
SimpleQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}