From 8c019590801ee7a675b1dd09da2d6543f07af8d6 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 3 May 2015 15:55:26 +0200 Subject: [PATCH] WIP, crap multiprocessor performance with simplequeue abstraction, 160k/s --- .../util/messagebus/common/simpleq/Node.java | 10 +- .../messagebus/common/simpleq/NodeState.java | 8 - .../jctools/MpmcArrayTransferQueue.java | 179 +++++++------ .../simpleq/{ => jctools}/SimpleQueue.java | 246 +++++++++++------- .../LinkTransferQueueConcurrentPerfTest.java | 149 +++++++++++ .../messagebus/LinkTransferQueuePerfTest.java | 12 +- .../MpmcQueueAltConcurrentPerfTest.java | 144 ++++++++++ .../MpmcQueueConcurrentPerfTest.java | 144 ++++++++++ .../messagebus/SimpleQueueAltPerfTest.java | 109 +++++--- 9 files changed, 770 insertions(+), 231 deletions(-) delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java rename src/main/java/dorkbox/util/messagebus/common/simpleq/{ => jctools}/SimpleQueue.java (59%) create mode 100644 src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java create mode 100644 src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java create mode 100644 src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 96b515d..204e617 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -2,6 +2,7 @@ package dorkbox.util.messagebus.common.simpleq; + // mpmc sparse.shift = 2, for this to be fast. abstract class PrePad { @@ -14,7 +15,8 @@ abstract class ColdItems extends PrePad { // public final int ID = count.getAndIncrement(); public int type = 0; -// public short type = MessageType.ONE; + +// public short messageType = MessageType.ONE; public Object item1 = null; // public Object item2 = null; // public Object item3 = null; @@ -31,11 +33,11 @@ abstract class HotItem1 extends Pad0 { abstract class Pad1 extends HotItem1 { // volatile long y0, y1, y2, y4, y5, y6 = 7L; -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; } abstract class HotItem2 extends Pad1 { - public transient volatile Thread thread; + public Thread thread; } abstract class Pad2 extends HotItem2 { @@ -50,7 +52,7 @@ abstract class HotItem3 extends Pad2 { public class Node extends HotItem3 { // post-padding // volatile long y0, y1, y2, y4, y5, y6 = 7L; -// volatile long z0, z1, z2, z4, z5, z6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java deleted file mode 100644 index 83102d9..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java +++ /dev/null @@ -1,8 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - -public class NodeState { - public static final short FREE = 0; - public static final short CANCELLED = 1; - - private NodeState() {} -} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java index e6cec3e..629dfcd 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java @@ -1,7 +1,5 @@ package dorkbox.util.messagebus.common.simpleq.jctools; -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.LockSupport; @@ -9,24 +7,6 @@ import dorkbox.util.messagebus.common.simpleq.Node; public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField { - public static final int TYPE_EMPTY = 0; - public static final int TYPE_CONSUMER = 1; - public static final int TYPE_PRODUCER = 2; - - private static final long TYPE; - - static { - try { - TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - private static final int lpType(Object node) { - return UNSAFE.getInt(node, TYPE); - } - /** The number of CPUs */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; @@ -76,56 +56,46 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField= producerIndex && // test against cached pIndex +// consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must +// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] +// return null; + } + + // another consumer beat us and moved sequence ahead, retry 2 + return null; + } + public Object take(final boolean timed, final long nanos) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; @@ -213,15 +222,15 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField= producerIndex && // test against cached pIndex - consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must + consumerIndex >= producerIndex && // test against cached pIndex + consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// return null; + // return null; busySpin(); // empty, so busy spin } // another consumer beat us and moved sequence ahead, retry 2 - // only producer will busy spin + busySpin(); } } @@ -541,26 +550,26 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField 0) { --spins; -//// } else if (spins > negMaxUntimedSpins) { -//// --spins; -//// LockSupport.parkNanos(1); +// } else if (spins > negMaxUntimedSpins) { +// --spins; +// UNSAFE.park(false, 1L); } else { // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. - LockSupport.park(); + UNSAFE.park(false, 0L); -// if (myThread.isInterrupted()) { + if (myThread.isInterrupted()) { // casThread(node, myThread, null); -// Thread.interrupted(); -// throw new InterruptedException(); -// } + Thread.interrupted(); + throw new InterruptedException(); + } } } - } +// } } - public void unpark(Object node, Object thread) { - if (casThread(node, thread, Thread.currentThread())) { - } else { - UNSAFE.unpark(thread); - } + public void unpark(Object node) { + final Object thread = lpThread(node); + soThread(node, null); + UNSAFE.unpark(thread); + +// if (thread != null && casThread(node, thread, Thread.currentThread())) { +// } } } diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java new file mode 100644 index 0000000..e50088e --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2012 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package dorkbox.util.messagebus; + +import org.openjdk.jol.info.ClassLayout; +import org.openjdk.jol.util.VMSupport; + +import dorkbox.util.messagebus.common.LinkedTransferQueue; +import dorkbox.util.messagebus.common.simpleq.Node; + +public class LinkTransferQueueConcurrentPerfTest { + // 15 == 32 * 1024 + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + + private static final int concurrency = 2; + + public static void main(final String[] args) throws Exception { + System.out.println(VMSupport.vmDetails()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); + + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final LinkedTransferQueue queue = new LinkedTransferQueue(); + + final long[] results = new long[20]; + for (int i = 0; i < 20; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last 10 results for summary + long sum = 0; + for (int i = 10; i < 20; i++) { + sum += results[i]; + } + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + } + + private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final LinkedTransferQueue queue; + volatile long start; + + public Producer(LinkedTransferQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedTransferQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + try { + do { + producer.transfer(TEST_VALUE); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final LinkedTransferQueue queue; + Object result; + volatile long end; + + public Consumer(LinkedTransferQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + LinkedTransferQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java index 1a4a86f..dda3de7 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java @@ -24,7 +24,7 @@ import dorkbox.util.messagebus.common.simpleq.Node; public class LinkTransferQueuePerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); @@ -53,13 +53,11 @@ public class LinkTransferQueuePerfTest { private static long performanceRun(int runNumber, LinkedTransferQueue queue) throws Exception { -// for (int i=0;i consumer = queue; + LinkedTransferQueue consumer = queue; Object result; int i = REPETITIONS; int queueEmpty = 0; diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java new file mode 100644 index 0000000..653ee29 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2012 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus; + +import org.openjdk.jol.info.ClassLayout; +import org.openjdk.jol.util.VMSupport; + +import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue; + +public class MpmcQueueAltConcurrentPerfTest { + // 15 == 32 * 1024 + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + + private static final int concurrency = 2; + + public static void main(final String[] args) throws Exception { + System.out.println(VMSupport.vmDetails()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); + + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final MpmcArrayTransferQueue queue = new MpmcArrayTransferQueue(1 << 17); + + final long[] results = new long[20]; + for (int i = 0; i < 20; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last 10 results for summary + long sum = 0; + for (int i = 10; i < 20; i++) { + sum += results[i]; + } + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + } + + private static long performanceRun(int runNumber, MpmcArrayTransferQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final MpmcArrayTransferQueue queue; + volatile long start; + + public Producer(MpmcArrayTransferQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcArrayTransferQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + do { + producer.put(TEST_VALUE, false, 0); + } while (0 != --i); + } + } + + public static class Consumer implements Runnable { + private final MpmcArrayTransferQueue queue; + Object result; + volatile long end; + + public Consumer(MpmcArrayTransferQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcArrayTransferQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + do { + result = consumer.take(false, 0); + } while (0 != --i); + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java new file mode 100644 index 0000000..ea63269 --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2012 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus; + +import org.openjdk.jol.info.ClassLayout; +import org.openjdk.jol.util.VMSupport; + +import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; + +public class MpmcQueueConcurrentPerfTest { + // 15 == 32 * 1024 + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + + private static final int concurrency = 2; + + public static void main(final String[] args) throws Exception { + System.out.println(VMSupport.vmDetails()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); + + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final MpmcArrayQueue queue = new MpmcArrayQueue(1 << 17); + + final long[] results = new long[20]; + for (int i = 0; i < 20; i++) { + System.gc(); + results[i] = performanceRun(i, queue); + } + // only average last 10 results for summary + long sum = 0; + for (int i = 10; i < 20; i++) { + sum += results[i]; + } + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + } + + private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; + long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; + String qName = queue.getClass().getSimpleName(); + + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); + return ops; + } + + public static class Producer implements Runnable { + private final MpmcArrayQueue queue; + volatile long start; + + public Producer(MpmcArrayQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcArrayQueue producer = this.queue; + int i = REPETITIONS; + this.start = System.nanoTime(); + + do { + producer.offer(TEST_VALUE); + } while (0 != --i); + } + } + + public static class Consumer implements Runnable { + private final MpmcArrayQueue queue; + Object result; + volatile long end; + + public Consumer(MpmcArrayQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + MpmcArrayQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + do { + result = consumer.poll(); + } while (0 != --i); + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java index 0ead259..69bdda0 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java @@ -1,17 +1,14 @@ /* * Copyright 2012 Real Logic Ltd. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package dorkbox.util.messagebus; @@ -19,21 +16,23 @@ import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; import dorkbox.util.messagebus.common.simpleq.Node; -import dorkbox.util.messagebus.common.simpleq.SimpleQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; public class SimpleQueueAltPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + private static final int concurrency = 2; + public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - final SimpleQueue queue = new SimpleQueue(1<<17); + System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); + final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -49,31 +48,56 @@ public class SimpleQueueAltPerfTest { } private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception { - Producer p = new Producer(queue); - Thread thread = new Thread(p); - thread.start(); // producer will timestamp start - SimpleQueue consumer = queue; - Object result; - int i = REPETITIONS; - do { - result = consumer.take(); - } while (0 != --i); - long end = System.nanoTime(); + Producer[] producers = new Producer[concurrency]; + Consumer[] consumers = new Consumer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; - thread.join(); - long duration = end - p.start; + for (int i=0;i end) { + end = consumers[i].end; + } + } + + + long duration = end - start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); + System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); return ops; } public static class Producer implements Runnable { private final SimpleQueue queue; - int queueFull = 0; - long start; + volatile long start; public Producer(SimpleQueue queue) { this.queue = queue; @@ -83,7 +107,7 @@ public class SimpleQueueAltPerfTest { public void run() { SimpleQueue producer = this.queue; int i = REPETITIONS; - long s = System.nanoTime(); + this.start = System.nanoTime(); try { do { @@ -92,7 +116,34 @@ public class SimpleQueueAltPerfTest { } catch (InterruptedException e) { e.printStackTrace(); } - this.start = s; + } + } + + public static class Consumer implements Runnable { + private final SimpleQueue queue; + Object result; + volatile long end; + + public Consumer(SimpleQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + SimpleQueue consumer = this.queue; + Object result = null; + int i = REPETITIONS; + + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); } } }