From 4a42f395a684de38ba4d6e9f233992ae4d3a201a Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 13 Jan 2016 18:05:43 +0100 Subject: [PATCH] Added disruptor - AWESOME fast in speed tests. Has problems because of pub/sub lock contention when used in MessageBus --- src/com/lmax/disruptor/EventBusFactory.java | 19 + .../PublicationExceptionHandler.java | 33 ++ src/dorkbox/util/messagebus/MessageBus.java | 397 ++++++++++++------ .../util/messagebus/MessageHandler.java | 73 ++++ .../util/messagebus/MessageHolder.java | 17 + .../messagebus/error/DefaultErrorHandler.java | 2 +- .../error/ErrorHandlingSupport.java | 2 +- .../queuePerf/AbstractPerfTestDisruptor.java | 56 +++ .../queuePerf/ArrayBlockingQueue.java | 38 ++ .../queuePerf/Base_BlockingQueue.java | 187 +++++++++ .../messagebus/queuePerf/Base_Disruptor.java | 367 ++++++++++++++++ .../util/messagebus/queuePerf/Base_Queue.java | 190 +++++++++ .../queuePerf/Base_TransferQueue.java | 187 +++++++++ .../messagebus/queuePerf/Baseline_1P1C.java | 98 +++++ .../queuePerf/ConcurrentLinkedQueue.java | 25 ++ .../queuePerf/LinkedBlockingQueue.java | 36 ++ .../queuePerf/LinkedTransferQueue.java | 24 ++ .../messagebus/queuePerf/MpmcArrayQueue.java | 25 ++ .../PerfTest_MpmcArrayQueue_Concurrent.java | 2 - .../queuePerf/SynchronousQueue.java | 36 ++ .../TwoToTwoWorkProcessorThroughputTest.java | 156 +++++++ .../queuePerf/ValueAdditionWorkHandler.java | 35 ++ .../util/messagebus/queuePerf/ValueEvent.java | 41 ++ .../messagebus/queuePerf/ValuePublisher.java | 56 +++ 24 files changed, 1977 insertions(+), 125 deletions(-) create mode 100644 src/com/lmax/disruptor/EventBusFactory.java create mode 100644 src/com/lmax/disruptor/PublicationExceptionHandler.java create mode 100644 src/dorkbox/util/messagebus/MessageHandler.java create mode 100644 src/dorkbox/util/messagebus/MessageHolder.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/AbstractPerfTestDisruptor.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/ArrayBlockingQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/Base_BlockingQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/Base_Disruptor.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/Base_Queue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/Base_TransferQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/Baseline_1P1C.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/ConcurrentLinkedQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/LinkedBlockingQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/LinkedTransferQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/MpmcArrayQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/SynchronousQueue.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/TwoToTwoWorkProcessorThroughputTest.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/ValueAdditionWorkHandler.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/ValueEvent.java create mode 100644 test/dorkbox/util/messagebus/queuePerf/ValuePublisher.java diff --git a/src/com/lmax/disruptor/EventBusFactory.java b/src/com/lmax/disruptor/EventBusFactory.java new file mode 100644 index 0000000..e8628aa --- /dev/null +++ b/src/com/lmax/disruptor/EventBusFactory.java @@ -0,0 +1,19 @@ +package com.lmax.disruptor; + +import dorkbox.util.messagebus.MessageHolder; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public class EventBusFactory implements EventFactory { + + public EventBusFactory() { + } + + @Override + public + MessageHolder newInstance() { + return new MessageHolder(); + } +} diff --git a/src/com/lmax/disruptor/PublicationExceptionHandler.java b/src/com/lmax/disruptor/PublicationExceptionHandler.java new file mode 100644 index 0000000..c80e902 --- /dev/null +++ b/src/com/lmax/disruptor/PublicationExceptionHandler.java @@ -0,0 +1,33 @@ +package com.lmax.disruptor; + +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.error.PublicationError; + +public final class PublicationExceptionHandler implements ExceptionHandler { + private final ErrorHandlingSupport errorHandler; + + public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) { + this.errorHandler = errorHandler; + } + + @Override + public void handleEventException(final Throwable e, final long sequence, final T event) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")") + .setCause(e)); + } + + @Override + public void handleOnStartException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error starting the disruptor") + .setCause(e)); + } + + @Override + public void handleOnShutdownException(final Throwable e) { + this.errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error stopping the disruptor") + .setCause(e)); + } +} diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index bbd87bd..9cb418f 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -15,21 +15,36 @@ */ package dorkbox.util.messagebus; +import com.lmax.disruptor.EventBusFactory; +import com.lmax.disruptor.LiteBlockingWaitStrategy; +import com.lmax.disruptor.PublicationExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.Sequencer; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.WorkProcessor; import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.common.thread.NamedThreadFactory; import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.publication.*; +import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.publication.PublisherAll_MultiArg; +import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_FirstArg; +import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_MultiArg; +import dorkbox.util.messagebus.publication.PublisherExact_FirstArg; +import dorkbox.util.messagebus.publication.PublisherExact_MultiArg; import dorkbox.util.messagebus.subscription.FirstArgSubscriber; import dorkbox.util.messagebus.subscription.MultiArgSubscriber; import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.utils.ClassUtils; -import org.jctools.util.Pow2; -import java.util.ArrayDeque; -import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; /** * The base class for all message bus implementations with support for asynchronous message dispatch @@ -40,25 +55,30 @@ import java.util.Collection; public class MessageBus implements IMessageBus { private final ErrorHandlingSupport errorHandler; - private final MpmcMultiTransferArrayQueue dispatchQueue; +// private final ArrayBlockingQueue dispatchQueue; +// private final LinkedTransferQueue dispatchQueue; +// private final Collection threads; private final ClassUtils classUtils; private final SubscriptionManager subscriptionManager; - private final Collection threads; private final Publisher subscriptionPublisher; /** * Notifies the consumers during shutdown, that it's on purpose. */ - private volatile boolean shuttingDown; + private volatile boolean shuttingDown = false; + private WorkProcessor[] workProcessors; + private MessageHandler[] handlers; + private RingBuffer ringBuffer; + private Sequence workSequence; /** * By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages */ public MessageBus() { - this(Runtime.getRuntime().availableProcessors() / 2); + this(Runtime.getRuntime().availableProcessors()); } /** @@ -75,10 +95,12 @@ class MessageBus implements IMessageBus { */ public MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) { - numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads)); + // round to the nearest power of 2 + numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1)); this.errorHandler = new DefaultErrorHandler(); - this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads); +// this.dispatchQueue = new ArrayBlockingQueue(6); +// this.dispatchQueue = new LinkedTransferQueue(); classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); final StampedLock lock = new StampedLock(); @@ -124,79 +146,168 @@ class MessageBus implements IMessageBus { } this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock); - this.threads = new ArrayDeque(numberOfThreads); - NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); - for (int i = 0; i < numberOfThreads; i++) { - // each thread will run forever and process incoming message publication requests - Runnable runnable = new Runnable() { - @Override - public - void run() { - MpmcMultiTransferArrayQueue IN_QUEUE = MessageBus.this.dispatchQueue; - MultiNode node = new MultiNode(); - while (!MessageBus.this.shuttingDown) { - try { - //noinspection InfiniteLoopStatement - while (true) { - IN_QUEUE.take(node); - Integer type = (Integer) MultiNode.lpMessageType(node); - switch (type) { - case 1: { - publish(MultiNode.lpItem1(node)); - break; - } - case 2: { - publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node)); - break; - } - case 3: { - publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)); - break; - } - default: { - publish(MultiNode.lpItem1(node)); - } - } - } - } catch (InterruptedException e) { - if (!MessageBus.this.shuttingDown) { - Integer type = (Integer) MultiNode.lpMessageType(node); - switch (type) { - case 1: { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Thread interrupted while processing message").setCause(e).setPublishedObject( - MultiNode.lpItem1(node))); - break; - } - case 2: { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Thread interrupted while processing message").setCause(e).setPublishedObject( - MultiNode.lpItem1(node), MultiNode.lpItem2(node))); - break; - } - case 3: { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Thread interrupted while processing message").setCause(e).setPublishedObject( - MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node))); - break; - } - default: { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Thread interrupted while processing message").setCause(e).setPublishedObject( - MultiNode.lpItem1(node))); - } - } - } - } - } - } - }; + // Now we setup the disruptor and work handlers - Thread runner = dispatchThreadFactory.newThread(runnable); - this.threads.add(runner); + ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, + 0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter + new java.util.concurrent.LinkedTransferQueue(), + new NamedThreadFactory("MessageBus")); + + final PublicationExceptionHandler exceptionHandler = new PublicationExceptionHandler(errorHandler); + EventBusFactory factory = new EventBusFactory(); + + // setup the work handlers + handlers = new MessageHandler[numberOfThreads]; + for (int i = 0; i < handlers.length; i++) { + handlers[i] = new MessageHandler(subscriptionPublisher); // exactly one per thread is used } + + +// final int BUFFER_SIZE = ringBufferSize * 64; +// final int BUFFER_SIZE = 1024 * 64; +// final int BUFFER_SIZE = 1024; + final int BUFFER_SIZE = 8; + + + WaitStrategy consumerWaitStrategy; + consumerWaitStrategy = new LiteBlockingWaitStrategy(); +// consumerWaitStrategy = new BlockingWaitStrategy(); +// consumerWaitStrategy = new YieldingWaitStrategy(); +// consumerWaitStrategy = new BusySpinWaitStrategy(); +// consumerWaitStrategy = new SleepingWaitStrategy(); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); + + + ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + + // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item + final int numWorkers = handlers.length; + workProcessors = new WorkProcessor[numWorkers]; + workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + for (int i = 0; i < numWorkers; i++) { + workProcessors[i] = new WorkProcessor(ringBuffer, + sequenceBarrier, + handlers[i], + exceptionHandler, workSequence); + } + + // setup the WorkProcessor sequences (control what is consumed from the ring buffer) + final Sequence[] sequences = getSequences(); + ringBuffer.addGatingSequences(sequences); + + + // configure the start position for the WorkProcessors, and start them + final long cursor = ringBuffer.getCursor(); + workSequence.set(cursor); + + for (WorkProcessor processor : workProcessors) { + processor.getSequence() + .set(cursor); + executor.execute(processor); + } + + + +// this.threads = new ArrayDeque(numberOfThreads); +// final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); +// for (int i = 0; i < numberOfThreads; i++) { +// +// // each thread will run forever and process incoming message publication requests +// Runnable runnable = new Runnable() { +// @Override +// public +// void run() { +// ArrayBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; +//// LinkedTransferQueue IN_QUEUE = MessageBus.this.dispatchQueue; +// +// MultiNode node = new MultiNode(); +// while (!MessageBus.this.shuttingDown) { +// try { +// //noinspection InfiniteLoopStatement +// while (true) { +//// IN_QUEUE.take(node); +// final Object take = IN_QUEUE.take(); +//// Integer type = (Integer) MultiNode.lpMessageType(node); +//// switch (type) { +//// case 1: { +// publish(take); +//// break; +//// } +//// case 2: { +//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node)); +//// break; +//// } +//// case 3: { +//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)); +//// break; +//// } +//// default: { +//// publish(MultiNode.lpItem1(node)); +//// } +//// } +// } +// } catch (InterruptedException e) { +// if (!MessageBus.this.shuttingDown) { +// Integer type = (Integer) MultiNode.lpMessageType(node); +// switch (type) { +// case 1: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// break; +// } +// case 2: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node))); +// break; +// } +// case 3: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node), +// MultiNode.lpItem3(node))); +// break; +// } +// default: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// } +// } +// } +// } +// } +// } +// }; +// +// Thread runner = threadFactory.newThread(runnable); +// this.threads.add(runner); +// } + } + + // gets the sequences used for processing work + private + Sequence[] getSequences() { + final Sequence[] sequences = new Sequence[workProcessors.length + 1]; + for (int i = 0, size = workProcessors.length; i < size; i++) { + sequences[i] = workProcessors[i].getSequence(); + } + sequences[sequences.length - 1] = workSequence; // always add the work sequence + return sequences; } /** @@ -250,70 +361,96 @@ class MessageBus implements IMessageBus { public void publishAsync(final Object message) { if (message != null) { + final long seq = ringBuffer.next(); + try { - this.dispatchQueue.transfer(message, MessageType.ONE); + MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.ONE; + job.message1 = message; } catch (Exception e) { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Error while adding an asynchronous message").setCause(e).setPublishedObject(message)); + errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } finally { + // always publish the job + ringBuffer.publish(seq); } } else { throw new NullPointerException("Message cannot be null."); } + +// try { +// this.dispatchQueue.put(message); +// } catch (Exception e) { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message)); +// } } @Override public void publishAsync(final Object message1, final Object message2) { - if (message1 != null && message2 != null) { - try { - this.dispatchQueue.transfer(message1, message2); - } catch (Exception e) { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2)); - } - } - else { - throw new NullPointerException("Messages cannot be null."); - } +// if (message1 != null && message2 != null) { +// try { +// this.dispatchQueue.transfer(message1, message2); +// } catch (Exception e) { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2)); +// } +// } +// else { +// throw new NullPointerException("Messages cannot be null."); +// } } @Override public void publishAsync(final Object message1, final Object message2, final Object message3) { - if (message1 != null || message2 != null | message3 != null) { - try { - this.dispatchQueue.transfer(message1, message2, message3); - } catch (Exception e) { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2, message3)); - } - } - else { - throw new NullPointerException("Messages cannot be null."); - } +// if (message1 != null || message2 != null | message3 != null) { +// try { +// this.dispatchQueue.transfer(message1, message2, message3); +// } catch (Exception e) { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2, message3)); +// } +// } +// else { +// throw new NullPointerException("Messages cannot be null."); +// } } @Override public void publishAsync(final Object[] messages) { - if (messages != null) { - try { - this.dispatchQueue.transfer(messages, MessageType.ARRAY); - } catch (Exception e) { - errorHandler.handlePublicationError(new PublicationError().setMessage( - "Error while adding an asynchronous message").setCause(e).setPublishedObject(messages)); - } - } - else { - throw new NullPointerException("Message cannot be null."); - } +// if (messages != null) { +// try { +// this.dispatchQueue.transfer(messages, MessageType.ARRAY); +// } catch (Exception e) { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Error while adding an asynchronous message").setCause(e).setPublishedObject(messages)); +// } +// } +// else { +// throw new NullPointerException("Message cannot be null."); +// } } @Override public final boolean hasPendingMessages() { - return this.dispatchQueue.hasPendingMessages(); + // from workerPool.drainAndHalt() + Sequence[] workerSequences = getSequences(); + final long cursor = ringBuffer.getCursor(); + for (Sequence s : workerSequences) { + if (cursor > s.get()) { + return true; + } + } + + return false; + +// return !this.dispatchQueue.isEmpty(); } @Override @@ -325,21 +462,39 @@ class MessageBus implements IMessageBus { @Override public void start() { - for (Thread t : this.threads) { - t.start(); + if (shuttingDown) { + throw new Error("Unable to restart the MessageBus"); } + errorHandler.init(); + +// for (Thread t : this.threads) { +// t.start(); +// } - errorHandler.start(); } @Override public void shutdown() { this.shuttingDown = true; - for (Thread t : this.threads) { - t.interrupt(); + +// for (Thread t : this.threads) { +// t.interrupt(); +// } + + for (WorkProcessor processor : workProcessors) { + processor.halt(); } + + for (MessageHandler handler : handlers) { + while (!handler.isShutdown()) { + LockSupport.parkNanos(100L); // wait 100ms for handlers to quit + } + } + this.subscriptionManager.shutdown(); this.classUtils.clear(); } + + } diff --git a/src/dorkbox/util/messagebus/MessageHandler.java b/src/dorkbox/util/messagebus/MessageHandler.java new file mode 100644 index 0000000..2649cbc --- /dev/null +++ b/src/dorkbox/util/messagebus/MessageHandler.java @@ -0,0 +1,73 @@ +package dorkbox.util.messagebus; + +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.WorkHandler; +import dorkbox.util.messagebus.publication.Publisher; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public +class MessageHandler implements WorkHandler, LifecycleAware { + private final Publisher publisher; + + AtomicBoolean shutdown = new AtomicBoolean(false); + + public + MessageHandler(Publisher publisher) { + this.publisher = publisher; + } + + @Override + public + void onEvent(final MessageHolder event) throws Exception { + final int messageType = event.type; + switch (messageType) { + case MessageType.ONE: { + Object message1 = event.message1; +// System.err.println("(" + sequence + ")" + message1); +// this.workProcessor.release(sequence); + this.publisher.publish(message1); + return; + } + case MessageType.TWO: { + Object message1 = event.message1; + Object message2 = event.message2; + this.publisher.publish(message1, message2); + return; + } + case MessageType.THREE: { + Object message1 = event.message1; + Object message2 = event.message2; + Object message3 = event.message3; + this.publisher.publish(message1, message2, message3); + return; + } + case MessageType.ARRAY: { + Object[] messages = event.messages; + this.publisher.publish(messages); + return; + } + } + } + + @Override + public + void onStart() { + + } + + @Override + public synchronized + void onShutdown() { + shutdown.set(true); + } + + public + boolean isShutdown() { + return shutdown.get(); + } +} diff --git a/src/dorkbox/util/messagebus/MessageHolder.java b/src/dorkbox/util/messagebus/MessageHolder.java new file mode 100644 index 0000000..944afba --- /dev/null +++ b/src/dorkbox/util/messagebus/MessageHolder.java @@ -0,0 +1,17 @@ +package dorkbox.util.messagebus; + +/** + * @author dorkbox, llc Date: 2/2/15 + */ +public +class MessageHolder { + public int type = MessageType.ONE; + + public Object message1 = null; + public Object message2 = null; + public Object message3 = null; + public Object[] messages = null; + + public + MessageHolder() {} +} diff --git a/src/dorkbox/util/messagebus/error/DefaultErrorHandler.java b/src/dorkbox/util/messagebus/error/DefaultErrorHandler.java index 87610f3..7677626 100644 --- a/src/dorkbox/util/messagebus/error/DefaultErrorHandler.java +++ b/src/dorkbox/util/messagebus/error/DefaultErrorHandler.java @@ -64,7 +64,7 @@ class DefaultErrorHandler implements ErrorHandlingSupport { @Override public - void start() { + void init() { synchronized (this.errorHandlers) { if (this.errorHandlers.isEmpty()) { this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger()); diff --git a/src/dorkbox/util/messagebus/error/ErrorHandlingSupport.java b/src/dorkbox/util/messagebus/error/ErrorHandlingSupport.java index bcc062e..cc04452 100644 --- a/src/dorkbox/util/messagebus/error/ErrorHandlingSupport.java +++ b/src/dorkbox/util/messagebus/error/ErrorHandlingSupport.java @@ -57,5 +57,5 @@ interface ErrorHandlingSupport { void handleError(String error, Class listenerClass); - void start(); + void init(); } diff --git a/test/dorkbox/util/messagebus/queuePerf/AbstractPerfTestDisruptor.java b/test/dorkbox/util/messagebus/queuePerf/AbstractPerfTestDisruptor.java new file mode 100644 index 0000000..15f92d3 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/AbstractPerfTestDisruptor.java @@ -0,0 +1,56 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus.queuePerf; + + +public abstract class AbstractPerfTestDisruptor +{ + public static final int RUNS = 7; + + protected void testImplementations() + throws Exception + { + final int availableProcessors = Runtime.getRuntime().availableProcessors(); + if (getRequiredProcessorCount() > availableProcessors) + { + System.out.print("*** Warning ***: your system has insufficient processors to execute the test efficiently. "); + System.out.println("Processors required = " + getRequiredProcessorCount() + " available = " + availableProcessors); + } + + long[] disruptorOps = new long[RUNS]; + + System.out.println("Starting Disruptor tests"); + for (int i = 0; i < RUNS; i++) + { + System.gc(); + disruptorOps[i] = runDisruptorPass(); + System.out.format("Run %d, Disruptor=%,d ops/sec%n", i, Long.valueOf(disruptorOps[i])); + } + } + + public static void printResults(final String className, final long[] disruptorOps, final long[] queueOps) + { + for (int i = 0; i < RUNS; i++) + { + System.out.format("%s run %d: BlockingQueue=%,d Disruptor=%,d ops/sec\n", + className, Integer.valueOf(i), Long.valueOf(queueOps[i]), Long.valueOf(disruptorOps[i])); + } + } + + protected abstract int getRequiredProcessorCount(); + + protected abstract long runDisruptorPass() throws Exception; +} diff --git a/test/dorkbox/util/messagebus/queuePerf/ArrayBlockingQueue.java b/test/dorkbox/util/messagebus/queuePerf/ArrayBlockingQueue.java new file mode 100644 index 0000000..4a98718 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/ArrayBlockingQueue.java @@ -0,0 +1,38 @@ +package dorkbox.util.messagebus.queuePerf; + +@SuppressWarnings("Duplicates") +public class ArrayBlockingQueue extends Base_BlockingQueue { + + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 3; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s: \n", REPETITIONS, ArrayBlockingQueue.class.getSimpleName()); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.ArrayBlockingQueue queue = new java.util.concurrent.ArrayBlockingQueue(1024); + final Integer initialValue = Integer.valueOf(777); + new ABQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + +// System.out.println(""); +// System.out.println(""); +// +// for (int concurrency = 1; concurrency < 5; concurrency++) { +// final java.util.concurrent.ArrayBlockingQueue queue = new java.util.concurrent.ArrayBlockingQueue(1024); +// final Integer initialValue = Integer.valueOf(777); +// new ABQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, +// initialValue); +// } + } + + static + class ABQ_Block extends Base_BlockingQueue {} + + static + class ABQ_NonBlock extends Base_Queue {} +} diff --git a/test/dorkbox/util/messagebus/queuePerf/Base_BlockingQueue.java b/test/dorkbox/util/messagebus/queuePerf/Base_BlockingQueue.java new file mode 100644 index 0000000..a491ee8 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/Base_BlockingQueue.java @@ -0,0 +1,187 @@ +package dorkbox.util.messagebus.queuePerf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +@SuppressWarnings("Duplicates") +public +class Base_BlockingQueue { + + public + void run(final int repetitions, + final int producersCount, + final int consumersCount, + final int warmups, + final int runs, + final int bestRunsToAverage, + final boolean showStats, + final BlockingQueue queue, + final T initialValue) throws Exception { + + for (int i = 0; i < warmups; i++) { + performanceRun(i, queue, + false, producersCount, consumersCount, repetitions, initialValue); + } + + final Long[] results = new Long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue); + } + + // average best results for summary + List list = Arrays.asList(results); + Collections.sort(list); + + long sum = 0; + // ignore the highest one + int limit = runs - 1; + for (int i = limit - bestRunsToAverage; i < limit; i++) { + sum += list.get(i); + } + + long average = sum / bestRunsToAverage; + System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(), + producersCount, consumersCount, average); + } + + + /** + * Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to + * a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent. + */ + private + long performanceRun(final int runNumber, + final BlockingQueue queue, + final boolean showStats, + final int producersCount, + final int consumersCount, + int repetitions, + T initialValue) throws Exception { + + + // make sure it's evenly divisible by both producers and consumers + final int adjusted = repetitions / producersCount / consumersCount; + + int pRepetitions = adjusted * producersCount; + int cRepetitions = adjusted * consumersCount; + + + Producer[] producers = new Producer[producersCount]; + Consumer[] consumers = new Consumer[consumersCount]; + + Thread[] pThreads = new Thread[producersCount]; + Thread[] cThreads = new Thread[consumersCount]; + + for (int i=0;i(queue, pRepetitions, initialValue); + } + for (int i=0;i(queue, cRepetitions); + } + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + long duration = end - start; + long ops = repetitions * 1000000000L / duration; + + if (showStats) { + System.out.format("%d - ops/sec=%,d\n", runNumber, ops); + } + return ops; + } + + public static class Producer implements Runnable { + private final BlockingQueue queue; + volatile long start; + private int repetitions; + private final T initialValue; + + public Producer(BlockingQueue queue, int repetitions, T initialValue) { + this.queue = queue; + this.repetitions = repetitions; + this.initialValue = initialValue; + } + + @Override + public void run() { + BlockingQueue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + final T initialValue = this.initialValue; + + try { + do { + producer.put(initialValue); + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final BlockingQueue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(BlockingQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + public void run() { + BlockingQueue consumer = this.queue; + int i = this.repetitions; + + T result = null; + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/Base_Disruptor.java b/test/dorkbox/util/messagebus/queuePerf/Base_Disruptor.java new file mode 100644 index 0000000..2cf18a4 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/Base_Disruptor.java @@ -0,0 +1,367 @@ +package dorkbox.util.messagebus.queuePerf; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.util.DaemonThreadFactory; +import org.jctools.util.Pow2; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +@SuppressWarnings("Duplicates") +public +class Base_Disruptor { + + private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); + private static boolean SHOW = true; + + // must be power of 2. + private final int ringBufferSize = Pow2.roundToPowerOfTwo(AVAILABLE_PROCESSORS * 32); + + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 0; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s: \n", REPETITIONS, "Disruptor"); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final Integer initialValue = Integer.valueOf(777); + new Base_Disruptor().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, +// null, + initialValue); + } + } + + + + public + void run(final int repetitions, + final int producersCount, + final int consumersCount, + final int warmups, + final int runs, + final int bestRunsToAverage, + final boolean showStats, +// final TransferQueue queue, + final T initialValue) throws Exception { + +// for (int i = 0; i < warmups; i++) { +// performanceRun(i, ringBuffer, +// false, producersCount, consumersCount, handlers, workerPool, repetitions, initialValue); +// } + + final Long[] results = new Long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, showStats, producersCount, consumersCount, repetitions, + initialValue); + } + + // average best results for summary + List list = Arrays.asList(results); + Collections.sort(list); + + long sum = 0; + // ignore the highest one + int limit = runs - 1; + for (int i = limit - bestRunsToAverage; i < limit; i++) { + sum += list.get(i); + } + + long average = sum / bestRunsToAverage; + System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), +// queue.getClass().getSimpleName(), + "Disruptor", + producersCount, consumersCount, average); + } + + + /** + * Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to + * a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent. + */ + private + long performanceRun(final int runNumber, + final boolean showStats, + final int producersCount, + final int consumersCount, + int repetitions, + T initialValue) throws Exception { + + // make sure it's evenly divisible by both producers and consumers + final int adjusted = repetitions / producersCount / consumersCount; + + int pRepetitions = adjusted * producersCount; + +// final int BUFFER_SIZE = ringBufferSize * 64; +// final int BUFFER_SIZE = 1024 * 64; + final int BUFFER_SIZE = 1024; + + ExecutorService executor = new ThreadPoolExecutor(consumersCount, consumersCount, 0, + TimeUnit.NANOSECONDS, + new java.util.concurrent.LinkedTransferQueue(), + DaemonThreadFactory.INSTANCE); + + final PubExceptionHandler exceptionHandler = new PubExceptionHandler(); + ValueFactory factory = new ValueFactory(); + + WorkHandler> handlers[] = new EventHandler[consumersCount]; + for (int i = 0; i < handlers.length; i++) { + handlers[i] = new EventHandler(); // exactly one per thread + } + + WaitStrategy consumerWaitStrategy; + consumerWaitStrategy = new LiteBlockingWaitStrategy(); +// consumerWaitStrategy = new BlockingWaitStrategy(); +// consumerWaitStrategy = new YieldingWaitStrategy(); +// consumerWaitStrategy = new BusySpinWaitStrategy(); +// consumerWaitStrategy = new SleepingWaitStrategy(); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); + + + if (SHOW) { + SHOW = false; + System.err.println(BUFFER_SIZE + " : LiteBlockingWaitStrategy)"); + } + + + RingBuffer> ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + + // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item + final int numWorkers = handlers.length; + WorkProcessor[] workProcessors = new WorkProcessor[numWorkers]; + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + for (int i = 0; i < numWorkers; i++) { + workProcessors[i] = new WorkProcessor>(ringBuffer, + sequenceBarrier, + handlers[i], + exceptionHandler, + workSequence); + } + + // setup the WorkProcessor sequences (control what is consumed from the ring buffer) + final Sequence[] sequences = new Sequence[workProcessors.length + 1]; + for (int i = 0, size = workProcessors.length; i < size; i++) { + sequences[i] = workProcessors[i].getSequence(); + } + sequences[sequences.length - 1] = workSequence; // always add the work sequence + ringBuffer.addGatingSequences(sequences); + + + + + // configure the start position for the WorkProcessors, and start them + final long cursor = ringBuffer.getCursor(); + long expected = cursor + (producersCount * pRepetitions); // saved so we know when it is finished producing events + workSequence.set(cursor); + + for (WorkProcessor processor : workProcessors) { + processor.getSequence() + .set(cursor); + executor.execute(processor); + } + + + + Producer[] producers = new Producer[producersCount]; + Thread[] pThreads = new Thread[producersCount]; + + for (int i = 0; i < producersCount; i++) { + producers[i] = new Producer(ringBuffer, pRepetitions, initialValue); + } + + for (int i = 0; i < producersCount; i++) { + pThreads[i] = new Thread(producers[i], "Producer " + i); + } + + + + for (int i = 0; i < producersCount; i++) { + pThreads[i].start(); + } + + for (int i = 0; i < producersCount; i++) { + pThreads[i].join(); + } + + + + while (workSequence.get() < expected) { + LockSupport.parkNanos(1L); + } + + for (WorkProcessor processor : workProcessors) { + processor.halt(); + } + + + + + for (int i=0;i 0) { + end = end1; + } + } + + long duration = end - start; + long ops = repetitions * 1000000000L / duration; + + if (showStats) { + System.out.format("%d - ops/sec=%,d\n", runNumber, ops); + } + return ops; + } + + + public class Producer implements Runnable { + private final RingBuffer> queue; + volatile long start; + private int repetitions; + private final T initialValue; + + public Producer(RingBuffer> queue, int repetitions, T initialValue) { + this.queue = queue; + this.repetitions = repetitions; + this.initialValue = initialValue; + } + + @Override + public void run() { + RingBuffer> producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + final T initialValue = this.initialValue; + + try { + do { + // setup the job + final long seq = producer.next(); +// try { + ValueHolder eventJob = producer.get(seq); + eventJob.item = initialValue; +// } finally { + // always publish the job + producer.publish(seq); +// } + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public final class PubExceptionHandler implements ExceptionHandler { + + public + PubExceptionHandler() { + } + + @Override + public void handleEventException(final Throwable e, final long sequence, final Object event) { + System.err.println("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ") " + e.getMessage()); + } + + @Override + public void handleOnStartException(final Throwable e) { + System.err.println("Error starting the disruptor " + e.getMessage()); + } + + @Override + public void handleOnShutdownException(final Throwable e) { + System.err.println("Error stopping the disruptor " + e.getMessage()); + } + } + + + class ValueFactory implements EventFactory> { + + public ValueFactory() { + } + + @Override + public ValueHolder newInstance() { + return new ValueHolder(); + } + } + + class ValueHolder { + + public T item = null; + + public ValueHolder() {} + } + + + class EventHandler implements WorkHandler>, LifecycleAware{ + public long count = 0; + AtomicBoolean shutdown = new AtomicBoolean(false); + private long end = 0; + + public + EventHandler() { + } + + + public synchronized + long getEnd() { + return end; + } + + @Override + public + void onEvent(final ValueHolder event) throws Exception { +// count += 1; + end = System.nanoTime(); + } + + @Override + public + void onStart() { + } + + @Override + public synchronized + void onShutdown() { +// count -= count; + end = System.nanoTime(); +// end += count; + shutdown.set(true); + } + + public + boolean isShutdown() { + return shutdown.get(); + } + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/Base_Queue.java b/test/dorkbox/util/messagebus/queuePerf/Base_Queue.java new file mode 100644 index 0000000..1ea1298 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/Base_Queue.java @@ -0,0 +1,190 @@ +package dorkbox.util.messagebus.queuePerf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +@SuppressWarnings("Duplicates") +public +class Base_Queue { + + public + void run(final int repetitions, + final int producersCount, + final int consumersCount, + final int warmups, + final int runs, + final int bestRunsToAverage, + final boolean showStats, + final Queue queue, + final T initialValue) throws Exception { + + for (int i = 0; i < warmups; i++) { + performanceRun(i, queue, + false, producersCount, consumersCount, repetitions, initialValue); + } + + final Long[] results = new Long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue); + } + + // average best results for summary + List list = Arrays.asList(results); + Collections.sort(list); + + long sum = 0; + // ignore the highest one + int limit = runs - 1; + for (int i = limit - bestRunsToAverage; i < limit; i++) { + sum += list.get(i); + } + + long average = sum / bestRunsToAverage; + System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(), + producersCount, + consumersCount, average); + } + + + /** + * Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to + * a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent. + */ + private + long performanceRun(final int runNumber, + final Queue queue, + final boolean showStats, + final int producersCount, + final int consumersCount, + int repetitions, + T initialValue) throws Exception { + + + // make sure it's evenly divisible by both producers and consumers + final int adjusted = repetitions / producersCount / consumersCount; + + int pRepetitions = adjusted * producersCount; + int cRepetitions = adjusted * consumersCount; + + + Producer[] producers = new Producer[producersCount]; + Consumer[] consumers = new Consumer[consumersCount]; + + Thread[] pThreads = new Thread[producersCount]; + Thread[] cThreads = new Thread[consumersCount]; + + for (int i=0;i(queue, pRepetitions, initialValue); + } + for (int i=0;i(queue, cRepetitions); + } + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + long duration = end - start; + long ops = repetitions * 1000000000L / duration; + + if (showStats) { + System.out.format("%d - ops/sec=%,d\n", runNumber, ops); + } + return ops; + } + + public static class Producer implements Runnable { + private final Queue queue; + volatile long start; + private int repetitions; + private final T initialValue; + + public Producer(Queue queue, int repetitions, T initialValue) { + this.queue = queue; + this.repetitions = repetitions; + this.initialValue = initialValue; + } + + @Override + public void run() { + Queue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + final T initialValue = this.initialValue; + + try { + do { + while (!producer.offer(initialValue)) { + } + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final Queue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(Queue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + public void run() { + Queue consumer = this.queue; + int i = this.repetitions; + + T result = null; + try { + do { + while (null == (result = consumer.poll())) { + } + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/Base_TransferQueue.java b/test/dorkbox/util/messagebus/queuePerf/Base_TransferQueue.java new file mode 100644 index 0000000..faf1e28 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/Base_TransferQueue.java @@ -0,0 +1,187 @@ +package dorkbox.util.messagebus.queuePerf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TransferQueue; + +@SuppressWarnings("Duplicates") +public +class Base_TransferQueue { + + public + void run(final int repetitions, + final int producersCount, + final int consumersCount, + final int warmups, + final int runs, + final int bestRunsToAverage, + final boolean showStats, + final TransferQueue queue, + final T initialValue) throws Exception { + + for (int i = 0; i < warmups; i++) { + performanceRun(i, queue, + false, producersCount, consumersCount, repetitions, initialValue); + } + + final Long[] results = new Long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue); + } + + // average best results for summary + List list = Arrays.asList(results); + Collections.sort(list); + + long sum = 0; + // ignore the highest one + int limit = runs - 1; + for (int i = limit - bestRunsToAverage; i < limit; i++) { + sum += list.get(i); + } + + long average = sum / bestRunsToAverage; + System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(), + producersCount, consumersCount, average); + } + + + /** + * Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to + * a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent. + */ + private + long performanceRun(final int runNumber, + final TransferQueue queue, + final boolean showStats, + final int producersCount, + final int consumersCount, + int repetitions, + T initialValue) throws Exception { + + + // make sure it's evenly divisible by both producers and consumers + final int adjusted = repetitions / producersCount / consumersCount; + + int pRepetitions = adjusted * producersCount; + int cRepetitions = adjusted * consumersCount; + + + Producer[] producers = new Producer[producersCount]; + Consumer[] consumers = new Consumer[consumersCount]; + + Thread[] pThreads = new Thread[producersCount]; + Thread[] cThreads = new Thread[consumersCount]; + + for (int i=0;i(queue, pRepetitions, initialValue); + } + for (int i=0;i(queue, cRepetitions); + } + + for (int i=0;i end) { + end = consumers[i].end; + } + } + + long duration = end - start; + long ops = repetitions * 1000000000L / duration; + + if (showStats) { + System.out.format("%d - ops/sec=%,d\n", runNumber, ops); + } + return ops; + } + + public static class Producer implements Runnable { + private final TransferQueue queue; + volatile long start; + private int repetitions; + private final T initialValue; + + public Producer(TransferQueue queue, int repetitions, T initialValue) { + this.queue = queue; + this.repetitions = repetitions; + this.initialValue = initialValue; + } + + @Override + public void run() { + TransferQueue producer = this.queue; + int i = this.repetitions; + this.start = System.nanoTime(); + final T initialValue = this.initialValue; + + try { + do { + producer.transfer(initialValue); + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static class Consumer implements Runnable { + private final TransferQueue queue; + Object result; + volatile long end; + private int repetitions; + + public Consumer(TransferQueue queue, int repetitions) { + this.queue = queue; + this.repetitions = repetitions; + } + + public void run() { + TransferQueue consumer = this.queue; + int i = this.repetitions; + + T result = null; + try { + do { + result = consumer.take(); + } while (0 != --i); + } catch (Exception e) { + e.printStackTrace(); + } + + this.result = result; + this.end = System.nanoTime(); + } + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/Baseline_1P1C.java b/test/dorkbox/util/messagebus/queuePerf/Baseline_1P1C.java new file mode 100644 index 0000000..1ceece0 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/Baseline_1P1C.java @@ -0,0 +1,98 @@ +package dorkbox.util.messagebus.queuePerf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +@SuppressWarnings("Duplicates") +public +class Baseline_1P1C { + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 43; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s: \n", REPETITIONS, Baseline_1P1C.class.getSimpleName()); + + final java.util.concurrent.ConcurrentLinkedQueue queue = new java.util.concurrent.ConcurrentLinkedQueue(); + final Integer initialValue = Integer.valueOf(777); + new Baseline_1P1C().run(REPETITIONS, 1, 1, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + + + public + void run(final int repetitions, + final int producersCount, + final int consumersCount, + final int warmups, + final int runs, + final int bestRunsToAverage, + final boolean showStats, + final Queue queue, + final T initialValue) throws Exception { + + for (int i = 0; i < warmups; i++) { + performanceRun(i, queue, + false, producersCount, consumersCount, repetitions, initialValue); + } + + final Long[] results = new Long[runs]; + for (int i = 0; i < runs; i++) { + System.gc(); + results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue); + } + + // average best results for summary + List list = Arrays.asList(results); + Collections.sort(list); + + long sum = 0; + // ignore the highest one + int limit = runs - 1; + for (int i = limit - bestRunsToAverage; i < limit; i++) { + sum += list.get(i); + } + + long average = sum / bestRunsToAverage; + System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(), + producersCount, + consumersCount, average); + } + + + /** + * Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to + * a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent. + */ + private + long performanceRun(final int runNumber, + final Queue queue, + final boolean showStats, + final int producersCount, + final int consumersCount, + int repetitions, + T initialValue) throws Exception { + + // this just measure how long it takes to count from 0 - repetitions + long start = System.nanoTime(); + long end = -1; + + for (int i=0;i {} +} diff --git a/test/dorkbox/util/messagebus/queuePerf/LinkedBlockingQueue.java b/test/dorkbox/util/messagebus/queuePerf/LinkedBlockingQueue.java new file mode 100644 index 0000000..1655a2e --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/LinkedBlockingQueue.java @@ -0,0 +1,36 @@ +package dorkbox.util.messagebus.queuePerf; + +public class LinkedBlockingQueue { + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 3; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s\n", REPETITIONS, LinkedBlockingQueue.class.getSimpleName()); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.LinkedBlockingQueue queue = new java.util.concurrent.LinkedBlockingQueue(1024); + final Integer initialValue = Integer.valueOf(777); + new LBQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + + System.out.println(""); + System.out.println(""); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.LinkedBlockingQueue queue = new java.util.concurrent.LinkedBlockingQueue(1024); + final Integer initialValue = Integer.valueOf(777); + new LBQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + } + + static + class LBQ_Block extends Base_BlockingQueue {} + + static + class LBQ_NonBlock extends Base_Queue {} +} diff --git a/test/dorkbox/util/messagebus/queuePerf/LinkedTransferQueue.java b/test/dorkbox/util/messagebus/queuePerf/LinkedTransferQueue.java new file mode 100644 index 0000000..ee1b9f7 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/LinkedTransferQueue.java @@ -0,0 +1,24 @@ +package dorkbox.util.messagebus.queuePerf; + +public class LinkedTransferQueue { + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 3; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s\n", REPETITIONS, LinkedTransferQueue.class.getSimpleName()); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.LinkedTransferQueue queue = new java.util.concurrent.LinkedTransferQueue(); + final Integer initialValue = Integer.valueOf(777); + new LTQ().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + } + + static + class LTQ extends Base_TransferQueue {} + +} diff --git a/test/dorkbox/util/messagebus/queuePerf/MpmcArrayQueue.java b/test/dorkbox/util/messagebus/queuePerf/MpmcArrayQueue.java new file mode 100644 index 0000000..a7108dd --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/MpmcArrayQueue.java @@ -0,0 +1,25 @@ +package dorkbox.util.messagebus.queuePerf; + +@SuppressWarnings("Duplicates") +public class MpmcArrayQueue extends Base_BlockingQueue { + + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 3; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s: \n", REPETITIONS, MpmcArrayQueue.class.getSimpleName()); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final org.jctools.queues.MpmcArrayQueue queue = new org.jctools.queues.MpmcArrayQueue(1 << 17); + final Integer initialValue = Integer.valueOf(777); + new MpmcArray_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + } + + static + class MpmcArray_NonBlock extends Base_Queue {} +} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcArrayQueue_Concurrent.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcArrayQueue_Concurrent.java index cdd5ee5..e30e64d 100644 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcArrayQueue_Concurrent.java +++ b/test/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcArrayQueue_Concurrent.java @@ -107,7 +107,6 @@ public class PerfTest_MpmcArrayQueue_Concurrent { do { while (!producer.offer(TEST_VALUE)) { - Thread.yield(); } } while (0 != --i); } @@ -130,7 +129,6 @@ public class PerfTest_MpmcArrayQueue_Concurrent { do { while (null == (result = consumer.poll())) { - Thread.yield(); } } while (0 != --i); diff --git a/test/dorkbox/util/messagebus/queuePerf/SynchronousQueue.java b/test/dorkbox/util/messagebus/queuePerf/SynchronousQueue.java new file mode 100644 index 0000000..243e1d7 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/SynchronousQueue.java @@ -0,0 +1,36 @@ +package dorkbox.util.messagebus.queuePerf; + +public class SynchronousQueue { + public static final int REPETITIONS = 50 * 1000 * 100; + + private static final int bestRunsToAverage = 4; + private static final int runs = 10; + private static final int warmups = 3; + + public static void main(final String[] args) throws Exception { + System.out.format("reps: %,d %s\n", REPETITIONS, SynchronousQueue.class.getSimpleName()); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.SynchronousQueue queue = new java.util.concurrent.SynchronousQueue(); + final Integer initialValue = Integer.valueOf(777); + new SQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + + System.out.println(""); + System.out.println(""); + + for (int concurrency = 1; concurrency < 5; concurrency++) { + final java.util.concurrent.SynchronousQueue queue = new java.util.concurrent.SynchronousQueue(); + final Integer initialValue = Integer.valueOf(777); + new SQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue, + initialValue); + } + } + + static + class SQ_Block extends Base_BlockingQueue {} + + static + class SQ_NonBlock extends Base_Queue {} +} diff --git a/test/dorkbox/util/messagebus/queuePerf/TwoToTwoWorkProcessorThroughputTest.java b/test/dorkbox/util/messagebus/queuePerf/TwoToTwoWorkProcessorThroughputTest.java new file mode 100644 index 0000000..0832870 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/TwoToTwoWorkProcessorThroughputTest.java @@ -0,0 +1,156 @@ +package dorkbox.util.messagebus.queuePerf; + +import com.lmax.disruptor.BusySpinWaitStrategy; +import com.lmax.disruptor.IgnoreExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.WorkProcessor; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.locks.LockSupport; + +import static com.lmax.disruptor.RingBuffer.createMultiProducer; + +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *
+ * Sequence a series of events from multiple publishers going to multiple work processors.
+ *
+ * +----+                  +-----+
+ * | P1 |---+          +-->| WP1 |
+ * +----+   |  +-----+ |   +-----+
+ *          +->| RB1 |-+
+ * +----+   |  +-----+ |   +-----+
+ * | P2 |---+          +-->| WP2 |
+ * +----+                  +-----+
+ *
+ * P1  - Publisher 1
+ * P2  - Publisher 2
+ * RB  - RingBuffer
+ * WP1 - EventProcessor 1
+ * WP2 - EventProcessor 2
+ * 
+ */ +public final class TwoToTwoWorkProcessorThroughputTest extends AbstractPerfTestDisruptor +{ + private static final int NUM_PUBLISHERS = 1; + private static final int BUFFER_SIZE = 1024 * 64; + private static final long ITERATIONS = 1000L * 1000L * 1L; + private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 2, DaemonThreadFactory.INSTANCE); + private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1); + + /////////////////////////////////////////////////////////////////////////////////////////////// + + private final RingBuffer ringBuffer = + createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy()); + + private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + private final Sequence workSequence = new Sequence(-1); + + private final ValueAdditionWorkHandler[] handlers = new ValueAdditionWorkHandler[1]; + + { + handlers[0] = new ValueAdditionWorkHandler(); +// handlers[1] = new ValueAdditionWorkHandler(); + } + + @SuppressWarnings("unchecked") + private final WorkProcessor[] workProcessors = new WorkProcessor[1]; + + { + workProcessors[0] = new WorkProcessor( + ringBuffer, sequenceBarrier, + handlers[0], new IgnoreExceptionHandler(), + workSequence); +// workProcessors[1] = new WorkProcessor( +// ringBuffer, sequenceBarrier, +// handlers[1], new IgnoreExceptionHandler(), +// workSequence); + } + + private final ValuePublisher[] valuePublishers = new ValuePublisher[NUM_PUBLISHERS]; + + { + for (int i = 0; i < NUM_PUBLISHERS; i++) + { + valuePublishers[i] = new ValuePublisher(cyclicBarrier, ringBuffer, ITERATIONS); + } + + ringBuffer.addGatingSequences(workProcessors[0].getSequence() +// , workProcessors[1].getSequence() + ); + } + + /////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + protected int getRequiredProcessorCount() + { + return 4; + } + + @Override + protected long runDisruptorPass() throws Exception + { + long expected = ringBuffer.getCursor() + (NUM_PUBLISHERS * ITERATIONS); + Future[] futures = new Future[NUM_PUBLISHERS]; + for (int i = 0; i < NUM_PUBLISHERS; i++) + { + futures[i] = executor.submit(valuePublishers[i]); + } + + for (WorkProcessor processor : workProcessors) + { + executor.submit(processor); + } + + long start = System.currentTimeMillis(); + cyclicBarrier.await(); + + for (int i = 0; i < NUM_PUBLISHERS; i++) + { + futures[i].get(); + } + + while (workSequence.get() < expected) + { + LockSupport.parkNanos(1L); + } + + long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); + + Thread.sleep(1000); + + for (WorkProcessor processor : workProcessors) + { + processor.halt(); + } + + return opsPerSecond; + } + + public static void main(String[] args) throws Exception + { + new TwoToTwoWorkProcessorThroughputTest().testImplementations(); + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/ValueAdditionWorkHandler.java b/test/dorkbox/util/messagebus/queuePerf/ValueAdditionWorkHandler.java new file mode 100644 index 0000000..bf26e21 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/ValueAdditionWorkHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 2012 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus.queuePerf; + +import com.lmax.disruptor.WorkHandler; + +public class ValueAdditionWorkHandler implements WorkHandler +{ + private long total; + + @Override + public void onEvent(ValueEvent event) throws Exception + { + long value = event.getValue(); + total += value; + } + + public long getTotal() + { + return total; + } +} diff --git a/test/dorkbox/util/messagebus/queuePerf/ValueEvent.java b/test/dorkbox/util/messagebus/queuePerf/ValueEvent.java new file mode 100644 index 0000000..dd2c096 --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/ValueEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus.queuePerf; + +import com.lmax.disruptor.EventFactory; + +public final class ValueEvent +{ + private long value; + + public long getValue() + { + return value; + } + + public void setValue(final long value) + { + this.value = value; + } + + public static final EventFactory EVENT_FACTORY = new EventFactory() + { + public ValueEvent newInstance() + { + return new ValueEvent(); + } + }; +} diff --git a/test/dorkbox/util/messagebus/queuePerf/ValuePublisher.java b/test/dorkbox/util/messagebus/queuePerf/ValuePublisher.java new file mode 100644 index 0000000..c4d57dd --- /dev/null +++ b/test/dorkbox/util/messagebus/queuePerf/ValuePublisher.java @@ -0,0 +1,56 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.util.messagebus.queuePerf; + +import com.lmax.disruptor.RingBuffer; + +import java.util.concurrent.CyclicBarrier; + +public final class ValuePublisher implements Runnable +{ + private final CyclicBarrier cyclicBarrier; + private final RingBuffer ringBuffer; + private final long iterations; + + public ValuePublisher( + final CyclicBarrier cyclicBarrier, final RingBuffer ringBuffer, final long iterations) + { + this.cyclicBarrier = cyclicBarrier; + this.ringBuffer = ringBuffer; + this.iterations = iterations; + } + + @Override + public void run() + { + try + { + cyclicBarrier.await(); + + for (long i = 0; i < iterations; i++) + { + long sequence = ringBuffer.next(); + ValueEvent event = ringBuffer.get(sequence); + event.setValue(i); + ringBuffer.publish(sequence); + } + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + } +}