Added disruptor - AWESOME fast in speed tests. Has problems because of pub/sub lock contention when used in MessageBus

This commit is contained in:
nathan 2016-01-13 18:05:43 +01:00
parent 712b0d8265
commit 4a42f395a6
24 changed files with 1977 additions and 125 deletions

View File

@ -0,0 +1,19 @@
package com.lmax.disruptor;
import dorkbox.util.messagebus.MessageHolder;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class EventBusFactory implements EventFactory<MessageHolder> {
public EventBusFactory() {
}
@Override
public
MessageHolder newInstance() {
return new MessageHolder();
}
}

View File

@ -0,0 +1,33 @@
package com.lmax.disruptor;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
public final class PublicationExceptionHandler<T> implements ExceptionHandler<T> {
private final ErrorHandlingSupport errorHandler;
public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void handleEventException(final Throwable e, final long sequence, final T event) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")")
.setCause(e));
}
@Override
public void handleOnStartException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error starting the disruptor")
.setCause(e));
}
@Override
public void handleOnShutdownException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error stopping the disruptor")
.setCause(e));
}
}

View File

@ -15,21 +15,36 @@
*/
package dorkbox.util.messagebus;
import com.lmax.disruptor.EventBusFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PublicationExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.publication.*;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.publication.PublisherAll_MultiArg;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_FirstArg;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_MultiArg;
import dorkbox.util.messagebus.publication.PublisherExact_FirstArg;
import dorkbox.util.messagebus.publication.PublisherExact_MultiArg;
import dorkbox.util.messagebus.subscription.FirstArgSubscriber;
import dorkbox.util.messagebus.subscription.MultiArgSubscriber;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
import org.jctools.util.Pow2;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
@ -40,25 +55,30 @@ import java.util.Collection;
public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
private final MpmcMultiTransferArrayQueue dispatchQueue;
// private final ArrayBlockingQueue<Object> dispatchQueue;
// private final LinkedTransferQueue<Object> dispatchQueue;
// private final Collection<Thread> threads;
private final ClassUtils classUtils;
private final SubscriptionManager subscriptionManager;
private final Collection<Thread> threads;
private final Publisher subscriptionPublisher;
/**
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown;
private volatile boolean shuttingDown = false;
private WorkProcessor[] workProcessors;
private MessageHandler[] handlers;
private RingBuffer<MessageHolder> ringBuffer;
private Sequence workSequence;
/**
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
*/
public
MessageBus() {
this(Runtime.getRuntime().availableProcessors() / 2);
this(Runtime.getRuntime().availableProcessors());
}
/**
@ -75,10 +95,12 @@ class MessageBus implements IMessageBus {
*/
public
MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) {
numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads));
// round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
this.errorHandler = new DefaultErrorHandler();
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
// this.dispatchQueue = new ArrayBlockingQueue<Object>(6);
// this.dispatchQueue = new LinkedTransferQueue<Object>();
classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final StampedLock lock = new StampedLock();
@ -124,79 +146,168 @@ class MessageBus implements IMessageBus {
}
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
MpmcMultiTransferArrayQueue IN_QUEUE = MessageBus.this.dispatchQueue;
MultiNode node = new MultiNode();
while (!MessageBus.this.shuttingDown) {
try {
//noinspection InfiniteLoopStatement
while (true) {
IN_QUEUE.take(node);
Integer type = (Integer) MultiNode.lpMessageType(node);
switch (type) {
case 1: {
publish(MultiNode.lpItem1(node));
break;
}
case 2: {
publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node));
break;
}
case 3: {
publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node));
break;
}
default: {
publish(MultiNode.lpItem1(node));
}
}
}
} catch (InterruptedException e) {
if (!MessageBus.this.shuttingDown) {
Integer type = (Integer) MultiNode.lpMessageType(node);
switch (type) {
case 1: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
MultiNode.lpItem1(node)));
break;
}
case 2: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
MultiNode.lpItem1(node), MultiNode.lpItem2(node)));
break;
}
case 3: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)));
break;
}
default: {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
MultiNode.lpItem1(node)));
}
}
}
}
}
}
};
// Now we setup the disruptor and work handlers
Thread runner = dispatchThreadFactory.newThread(runnable);
this.threads.add(runner);
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
new NamedThreadFactory("MessageBus"));
final PublicationExceptionHandler<MessageHolder> exceptionHandler = new PublicationExceptionHandler<MessageHolder>(errorHandler);
EventBusFactory factory = new EventBusFactory();
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(subscriptionPublisher); // exactly one per thread is used
}
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
final int BUFFER_SIZE = 8;
WaitStrategy consumerWaitStrategy;
consumerWaitStrategy = new LiteBlockingWaitStrategy();
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
final int numWorkers = handlers.length;
workProcessors = new WorkProcessor[numWorkers];
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer,
sequenceBarrier,
handlers[i],
exceptionHandler, workSequence);
}
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence()
.set(cursor);
executor.execute(processor);
}
// this.threads = new ArrayDeque<Thread>(numberOfThreads);
// final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
// for (int i = 0; i < numberOfThreads; i++) {
//
// // each thread will run forever and process incoming message publication requests
// Runnable runnable = new Runnable() {
// @Override
// public
// void run() {
// ArrayBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//// LinkedTransferQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//
// MultiNode node = new MultiNode();
// while (!MessageBus.this.shuttingDown) {
// try {
// //noinspection InfiniteLoopStatement
// while (true) {
//// IN_QUEUE.take(node);
// final Object take = IN_QUEUE.take();
//// Integer type = (Integer) MultiNode.lpMessageType(node);
//// switch (type) {
//// case 1: {
// publish(take);
//// break;
//// }
//// case 2: {
//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node));
//// break;
//// }
//// case 3: {
//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node));
//// break;
//// }
//// default: {
//// publish(MultiNode.lpItem1(node));
//// }
//// }
// }
// } catch (InterruptedException e) {
// if (!MessageBus.this.shuttingDown) {
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// break;
// }
// case 2: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node)));
// break;
// }
// case 3: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node),
// MultiNode.lpItem3(node)));
// break;
// }
// default: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// }
// }
// }
// }
// }
// }
// };
//
// Thread runner = threadFactory.newThread(runnable);
// this.threads.add(runner);
// }
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[workProcessors.length + 1];
for (int i = 0, size = workProcessors.length; i < size; i++) {
sequences[i] = workProcessors[i].getSequence();
}
sequences[sequences.length - 1] = workSequence; // always add the work sequence
return sequences;
}
/**
@ -250,70 +361,96 @@ class MessageBus implements IMessageBus {
public
void publishAsync(final Object message) {
if (message != null) {
final long seq = ringBuffer.next();
try {
this.dispatchQueue.transfer(message, MessageType.ONE);
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE;
job.message1 = message;
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
else {
throw new NullPointerException("Message cannot be null.");
}
// try {
// this.dispatchQueue.put(message);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
// }
}
@Override
public
void publishAsync(final Object message1, final Object message2) {
if (message1 != null && message2 != null) {
try {
this.dispatchQueue.transfer(message1, message2);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2));
}
}
else {
throw new NullPointerException("Messages cannot be null.");
}
// if (message1 != null && message2 != null) {
// try {
// this.dispatchQueue.transfer(message1, message2);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2));
// }
// }
// else {
// throw new NullPointerException("Messages cannot be null.");
// }
}
@Override
public
void publishAsync(final Object message1, final Object message2, final Object message3) {
if (message1 != null || message2 != null | message3 != null) {
try {
this.dispatchQueue.transfer(message1, message2, message3);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2, message3));
}
}
else {
throw new NullPointerException("Messages cannot be null.");
}
// if (message1 != null || message2 != null | message3 != null) {
// try {
// this.dispatchQueue.transfer(message1, message2, message3);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message1, message2, message3));
// }
// }
// else {
// throw new NullPointerException("Messages cannot be null.");
// }
}
@Override
public
void publishAsync(final Object[] messages) {
if (messages != null) {
try {
this.dispatchQueue.transfer(messages, MessageType.ARRAY);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Error while adding an asynchronous message").setCause(e).setPublishedObject(messages));
}
}
else {
throw new NullPointerException("Message cannot be null.");
}
// if (messages != null) {
// try {
// this.dispatchQueue.transfer(messages, MessageType.ARRAY);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(messages));
// }
// }
// else {
// throw new NullPointerException("Message cannot be null.");
// }
}
@Override
public final
boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
// return !this.dispatchQueue.isEmpty();
}
@Override
@ -325,21 +462,39 @@ class MessageBus implements IMessageBus {
@Override
public
void start() {
for (Thread t : this.threads) {
t.start();
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
errorHandler.init();
// for (Thread t : this.threads) {
// t.start();
// }
errorHandler.start();
}
@Override
public
void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
// for (Thread t : this.threads) {
// t.interrupt();
// }
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
this.subscriptionManager.shutdown();
this.classUtils.clear();
}
}

View File

@ -0,0 +1,73 @@
package dorkbox.util.messagebus;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import dorkbox.util.messagebus.publication.Publisher;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final Publisher publisher;
AtomicBoolean shutdown = new AtomicBoolean(false);
public
MessageHandler(Publisher publisher) {
this.publisher = publisher;
}
@Override
public
void onEvent(final MessageHolder event) throws Exception {
final int messageType = event.type;
switch (messageType) {
case MessageType.ONE: {
Object message1 = event.message1;
// System.err.println("(" + sequence + ")" + message1);
// this.workProcessor.release(sequence);
this.publisher.publish(message1);
return;
}
case MessageType.TWO: {
Object message1 = event.message1;
Object message2 = event.message2;
this.publisher.publish(message1, message2);
return;
}
case MessageType.THREE: {
Object message1 = event.message1;
Object message2 = event.message2;
Object message3 = event.message3;
this.publisher.publish(message1, message2, message3);
return;
}
case MessageType.ARRAY: {
Object[] messages = event.messages;
this.publisher.publish(messages);
return;
}
}
}
@Override
public
void onStart() {
}
@Override
public synchronized
void onShutdown() {
shutdown.set(true);
}
public
boolean isShutdown() {
return shutdown.get();
}
}

View File

@ -0,0 +1,17 @@
package dorkbox.util.messagebus;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public
class MessageHolder {
public int type = MessageType.ONE;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public
MessageHolder() {}
}

View File

@ -64,7 +64,7 @@ class DefaultErrorHandler implements ErrorHandlingSupport {
@Override
public
void start() {
void init() {
synchronized (this.errorHandlers) {
if (this.errorHandlers.isEmpty()) {
this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger());

View File

@ -57,5 +57,5 @@ interface ErrorHandlingSupport {
void handleError(String error, Class<?> listenerClass);
void start();
void init();
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2011 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.queuePerf;
public abstract class AbstractPerfTestDisruptor
{
public static final int RUNS = 7;
protected void testImplementations()
throws Exception
{
final int availableProcessors = Runtime.getRuntime().availableProcessors();
if (getRequiredProcessorCount() > availableProcessors)
{
System.out.print("*** Warning ***: your system has insufficient processors to execute the test efficiently. ");
System.out.println("Processors required = " + getRequiredProcessorCount() + " available = " + availableProcessors);
}
long[] disruptorOps = new long[RUNS];
System.out.println("Starting Disruptor tests");
for (int i = 0; i < RUNS; i++)
{
System.gc();
disruptorOps[i] = runDisruptorPass();
System.out.format("Run %d, Disruptor=%,d ops/sec%n", i, Long.valueOf(disruptorOps[i]));
}
}
public static void printResults(final String className, final long[] disruptorOps, final long[] queueOps)
{
for (int i = 0; i < RUNS; i++)
{
System.out.format("%s run %d: BlockingQueue=%,d Disruptor=%,d ops/sec\n",
className, Integer.valueOf(i), Long.valueOf(queueOps[i]), Long.valueOf(disruptorOps[i]));
}
}
protected abstract int getRequiredProcessorCount();
protected abstract long runDisruptorPass() throws Exception;
}

View File

@ -0,0 +1,38 @@
package dorkbox.util.messagebus.queuePerf;
@SuppressWarnings("Duplicates")
public class ArrayBlockingQueue extends Base_BlockingQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s: \n", REPETITIONS, ArrayBlockingQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.ArrayBlockingQueue queue = new java.util.concurrent.ArrayBlockingQueue(1024);
final Integer initialValue = Integer.valueOf(777);
new ABQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
// System.out.println("");
// System.out.println("");
//
// for (int concurrency = 1; concurrency < 5; concurrency++) {
// final java.util.concurrent.ArrayBlockingQueue queue = new java.util.concurrent.ArrayBlockingQueue(1024);
// final Integer initialValue = Integer.valueOf(777);
// new ABQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
// initialValue);
// }
}
static
class ABQ_Block extends Base_BlockingQueue<Integer> {}
static
class ABQ_NonBlock extends Base_Queue<Integer> {}
}

View File

@ -0,0 +1,187 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@SuppressWarnings("Duplicates")
public
class Base_BlockingQueue<T> {
public
void run(final int repetitions,
final int producersCount,
final int consumersCount,
final int warmups,
final int runs,
final int bestRunsToAverage,
final boolean showStats,
final BlockingQueue<T> queue,
final T initialValue) throws Exception {
for (int i = 0; i < warmups; i++) {
performanceRun(i, queue,
false, producersCount, consumersCount, repetitions, initialValue);
}
final Long[] results = new Long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue);
}
// average best results for summary
List<Long> list = Arrays.asList(results);
Collections.sort(list);
long sum = 0;
// ignore the highest one
int limit = runs - 1;
for (int i = limit - bestRunsToAverage; i < limit; i++) {
sum += list.get(i);
}
long average = sum / bestRunsToAverage;
System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(),
producersCount, consumersCount, average);
}
/**
* Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to
* a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent.
*/
private
long performanceRun(final int runNumber,
final BlockingQueue<T> queue,
final boolean showStats,
final int producersCount,
final int consumersCount,
int repetitions,
T initialValue) throws Exception {
// make sure it's evenly divisible by both producers and consumers
final int adjusted = repetitions / producersCount / consumersCount;
int pRepetitions = adjusted * producersCount;
int cRepetitions = adjusted * consumersCount;
Producer[] producers = new Producer[producersCount];
Consumer[] consumers = new Consumer[consumersCount];
Thread[] pThreads = new Thread[producersCount];
Thread[] cThreads = new Thread[consumersCount];
for (int i=0;i<producersCount;i++) {
producers[i] = new Producer<T>(queue, pRepetitions, initialValue);
}
for (int i=0;i<consumersCount;i++) {
consumers[i] = new Consumer<T>(queue, cRepetitions);
}
for (int i=0;i<producersCount;i++) {
pThreads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].join();
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<producersCount;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
}
for (int i=0;i<consumersCount;i++) {
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1000000000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
public static class Producer<T> implements Runnable {
private final BlockingQueue<T> queue;
volatile long start;
private int repetitions;
private final T initialValue;
public Producer(BlockingQueue<T> queue, int repetitions, T initialValue) {
this.queue = queue;
this.repetitions = repetitions;
this.initialValue = initialValue;
}
@Override
public void run() {
BlockingQueue<T> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
final T initialValue = this.initialValue;
try {
do {
producer.put(initialValue);
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class Consumer<T> implements Runnable {
private final BlockingQueue<T> queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(BlockingQueue<T> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
public void run() {
BlockingQueue<T> consumer = this.queue;
int i = this.repetitions;
T result = null;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -0,0 +1,367 @@
package dorkbox.util.messagebus.queuePerf;
import com.lmax.disruptor.*;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.jctools.util.Pow2;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
@SuppressWarnings("Duplicates")
public
class Base_Disruptor<T> {
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private static boolean SHOW = true;
// must be power of 2.
private final int ringBufferSize = Pow2.roundToPowerOfTwo(AVAILABLE_PROCESSORS * 32);
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 0;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s: \n", REPETITIONS, "Disruptor");
for (int concurrency = 1; concurrency < 5; concurrency++) {
final Integer initialValue = Integer.valueOf(777);
new Base_Disruptor().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false,
// null,
initialValue);
}
}
public
void run(final int repetitions,
final int producersCount,
final int consumersCount,
final int warmups,
final int runs,
final int bestRunsToAverage,
final boolean showStats,
// final TransferQueue<T> queue,
final T initialValue) throws Exception {
// for (int i = 0; i < warmups; i++) {
// performanceRun(i, ringBuffer,
// false, producersCount, consumersCount, handlers, workerPool, repetitions, initialValue);
// }
final Long[] results = new Long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, showStats, producersCount, consumersCount, repetitions,
initialValue);
}
// average best results for summary
List<Long> list = Arrays.asList(results);
Collections.sort(list);
long sum = 0;
// ignore the highest one
int limit = runs - 1;
for (int i = limit - bestRunsToAverage; i < limit; i++) {
sum += list.get(i);
}
long average = sum / bestRunsToAverage;
System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(),
// queue.getClass().getSimpleName(),
"Disruptor",
producersCount, consumersCount, average);
}
/**
* Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to
* a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent.
*/
private
long performanceRun(final int runNumber,
final boolean showStats,
final int producersCount,
final int consumersCount,
int repetitions,
T initialValue) throws Exception {
// make sure it's evenly divisible by both producers and consumers
final int adjusted = repetitions / producersCount / consumersCount;
int pRepetitions = adjusted * producersCount;
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
final int BUFFER_SIZE = 1024;
ExecutorService executor = new ThreadPoolExecutor(consumersCount, consumersCount, 0,
TimeUnit.NANOSECONDS,
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
DaemonThreadFactory.INSTANCE);
final PubExceptionHandler exceptionHandler = new PubExceptionHandler();
ValueFactory<T> factory = new ValueFactory<T>();
WorkHandler<ValueHolder<T>> handlers[] = new EventHandler[consumersCount];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new EventHandler<T>(); // exactly one per thread
}
WaitStrategy consumerWaitStrategy;
consumerWaitStrategy = new LiteBlockingWaitStrategy();
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
if (SHOW) {
SHOW = false;
System.err.println(BUFFER_SIZE + " : LiteBlockingWaitStrategy)");
}
RingBuffer<ValueHolder<T>> ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
final int numWorkers = handlers.length;
WorkProcessor[] workProcessors = new WorkProcessor[numWorkers];
Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<ValueHolder<T>>(ringBuffer,
sequenceBarrier,
handlers[i],
exceptionHandler,
workSequence);
}
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = new Sequence[workProcessors.length + 1];
for (int i = 0, size = workProcessors.length; i < size; i++) {
sequences[i] = workProcessors[i].getSequence();
}
sequences[sequences.length - 1] = workSequence; // always add the work sequence
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
long expected = cursor + (producersCount * pRepetitions); // saved so we know when it is finished producing events
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence()
.set(cursor);
executor.execute(processor);
}
Producer[] producers = new Producer[producersCount];
Thread[] pThreads = new Thread[producersCount];
for (int i = 0; i < producersCount; i++) {
producers[i] = new Producer<T>(ringBuffer, pRepetitions, initialValue);
}
for (int i = 0; i < producersCount; i++) {
pThreads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i = 0; i < producersCount; i++) {
pThreads[i].start();
}
for (int i = 0; i < producersCount; i++) {
pThreads[i].join();
}
while (workSequence.get() < expected) {
LockSupport.parkNanos(1L);
}
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (int i=0;i<consumersCount;i++) {
EventHandler h = (EventHandler) handlers[i];
while (!h.isShutdown()) {
Thread.yield();
}
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<producersCount;i++) {
if (producers[i].start - start < 0) {
start = producers[i].start;
}
}
for (int i=0;i<consumersCount;i++) {
EventHandler h = (EventHandler) handlers[i];
final long end1 = h.getEnd();
if (end1 - end > 0) {
end = end1;
}
}
long duration = end - start;
long ops = repetitions * 1000000000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
public class Producer<T> implements Runnable {
private final RingBuffer<ValueHolder<T>> queue;
volatile long start;
private int repetitions;
private final T initialValue;
public Producer(RingBuffer<ValueHolder<T>> queue, int repetitions, T initialValue) {
this.queue = queue;
this.repetitions = repetitions;
this.initialValue = initialValue;
}
@Override
public void run() {
RingBuffer<ValueHolder<T>> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
final T initialValue = this.initialValue;
try {
do {
// setup the job
final long seq = producer.next();
// try {
ValueHolder<T> eventJob = producer.get(seq);
eventJob.item = initialValue;
// } finally {
// always publish the job
producer.publish(seq);
// }
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public final class PubExceptionHandler implements ExceptionHandler {
public
PubExceptionHandler() {
}
@Override
public void handleEventException(final Throwable e, final long sequence, final Object event) {
System.err.println("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ") " + e.getMessage());
}
@Override
public void handleOnStartException(final Throwable e) {
System.err.println("Error starting the disruptor " + e.getMessage());
}
@Override
public void handleOnShutdownException(final Throwable e) {
System.err.println("Error stopping the disruptor " + e.getMessage());
}
}
class ValueFactory<T> implements EventFactory<ValueHolder<T>> {
public ValueFactory() {
}
@Override
public ValueHolder<T> newInstance() {
return new ValueHolder<T>();
}
}
class ValueHolder<T> {
public T item = null;
public ValueHolder() {}
}
class EventHandler<T> implements WorkHandler<ValueHolder<T>>, LifecycleAware{
public long count = 0;
AtomicBoolean shutdown = new AtomicBoolean(false);
private long end = 0;
public
EventHandler() {
}
public synchronized
long getEnd() {
return end;
}
@Override
public
void onEvent(final ValueHolder<T> event) throws Exception {
// count += 1;
end = System.nanoTime();
}
@Override
public
void onStart() {
}
@Override
public synchronized
void onShutdown() {
// count -= count;
end = System.nanoTime();
// end += count;
shutdown.set(true);
}
public
boolean isShutdown() {
return shutdown.get();
}
}
}

View File

@ -0,0 +1,190 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
@SuppressWarnings("Duplicates")
public
class Base_Queue<T> {
public
void run(final int repetitions,
final int producersCount,
final int consumersCount,
final int warmups,
final int runs,
final int bestRunsToAverage,
final boolean showStats,
final Queue<T> queue,
final T initialValue) throws Exception {
for (int i = 0; i < warmups; i++) {
performanceRun(i, queue,
false, producersCount, consumersCount, repetitions, initialValue);
}
final Long[] results = new Long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue);
}
// average best results for summary
List<Long> list = Arrays.asList(results);
Collections.sort(list);
long sum = 0;
// ignore the highest one
int limit = runs - 1;
for (int i = limit - bestRunsToAverage; i < limit; i++) {
sum += list.get(i);
}
long average = sum / bestRunsToAverage;
System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(),
producersCount,
consumersCount, average);
}
/**
* Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to
* a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent.
*/
private
long performanceRun(final int runNumber,
final Queue<T> queue,
final boolean showStats,
final int producersCount,
final int consumersCount,
int repetitions,
T initialValue) throws Exception {
// make sure it's evenly divisible by both producers and consumers
final int adjusted = repetitions / producersCount / consumersCount;
int pRepetitions = adjusted * producersCount;
int cRepetitions = adjusted * consumersCount;
Producer[] producers = new Producer[producersCount];
Consumer[] consumers = new Consumer[consumersCount];
Thread[] pThreads = new Thread[producersCount];
Thread[] cThreads = new Thread[consumersCount];
for (int i=0;i<producersCount;i++) {
producers[i] = new Producer<T>(queue, pRepetitions, initialValue);
}
for (int i=0;i<consumersCount;i++) {
consumers[i] = new Consumer<T>(queue, cRepetitions);
}
for (int i=0;i<producersCount;i++) {
pThreads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].join();
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<producersCount;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
}
for (int i=0;i<consumersCount;i++) {
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1000000000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
public static class Producer<T> implements Runnable {
private final Queue<T> queue;
volatile long start;
private int repetitions;
private final T initialValue;
public Producer(Queue<T> queue, int repetitions, T initialValue) {
this.queue = queue;
this.repetitions = repetitions;
this.initialValue = initialValue;
}
@Override
public void run() {
Queue<T> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
final T initialValue = this.initialValue;
try {
do {
while (!producer.offer(initialValue)) {
}
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class Consumer<T> implements Runnable {
private final Queue<T> queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(Queue<T> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
public void run() {
Queue<T> consumer = this.queue;
int i = this.repetitions;
T result = null;
try {
do {
while (null == (result = consumer.poll())) {
}
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -0,0 +1,187 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TransferQueue;
@SuppressWarnings("Duplicates")
public
class Base_TransferQueue<T> {
public
void run(final int repetitions,
final int producersCount,
final int consumersCount,
final int warmups,
final int runs,
final int bestRunsToAverage,
final boolean showStats,
final TransferQueue<T> queue,
final T initialValue) throws Exception {
for (int i = 0; i < warmups; i++) {
performanceRun(i, queue,
false, producersCount, consumersCount, repetitions, initialValue);
}
final Long[] results = new Long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue);
}
// average best results for summary
List<Long> list = Arrays.asList(results);
Collections.sort(list);
long sum = 0;
// ignore the highest one
int limit = runs - 1;
for (int i = limit - bestRunsToAverage; i < limit; i++) {
sum += list.get(i);
}
long average = sum / bestRunsToAverage;
System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(),
producersCount, consumersCount, average);
}
/**
* Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to
* a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent.
*/
private
long performanceRun(final int runNumber,
final TransferQueue<T> queue,
final boolean showStats,
final int producersCount,
final int consumersCount,
int repetitions,
T initialValue) throws Exception {
// make sure it's evenly divisible by both producers and consumers
final int adjusted = repetitions / producersCount / consumersCount;
int pRepetitions = adjusted * producersCount;
int cRepetitions = adjusted * consumersCount;
Producer[] producers = new Producer[producersCount];
Consumer[] consumers = new Consumer[consumersCount];
Thread[] pThreads = new Thread[producersCount];
Thread[] cThreads = new Thread[consumersCount];
for (int i=0;i<producersCount;i++) {
producers[i] = new Producer<T>(queue, pRepetitions, initialValue);
}
for (int i=0;i<consumersCount;i++) {
consumers[i] = new Consumer<T>(queue, cRepetitions);
}
for (int i=0;i<producersCount;i++) {
pThreads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].start();
}
for (int i=0;i<producersCount;i++) {
pThreads[i].join();
}
for (int i=0;i<consumersCount;i++) {
cThreads[i].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<producersCount;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
}
for (int i=0;i<consumersCount;i++) {
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1000000000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
public static class Producer<T> implements Runnable {
private final TransferQueue<T> queue;
volatile long start;
private int repetitions;
private final T initialValue;
public Producer(TransferQueue<T> queue, int repetitions, T initialValue) {
this.queue = queue;
this.repetitions = repetitions;
this.initialValue = initialValue;
}
@Override
public void run() {
TransferQueue<T> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
final T initialValue = this.initialValue;
try {
do {
producer.transfer(initialValue);
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class Consumer<T> implements Runnable {
private final TransferQueue<T> queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(TransferQueue<T> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
public void run() {
TransferQueue<T> consumer = this.queue;
int i = this.repetitions;
T result = null;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (Exception e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -0,0 +1,98 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
@SuppressWarnings("Duplicates")
public
class Baseline_1P1C<T> {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 43;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s: \n", REPETITIONS, Baseline_1P1C.class.getSimpleName());
final java.util.concurrent.ConcurrentLinkedQueue queue = new java.util.concurrent.ConcurrentLinkedQueue();
final Integer initialValue = Integer.valueOf(777);
new Baseline_1P1C().run(REPETITIONS, 1, 1, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
public
void run(final int repetitions,
final int producersCount,
final int consumersCount,
final int warmups,
final int runs,
final int bestRunsToAverage,
final boolean showStats,
final Queue<T> queue,
final T initialValue) throws Exception {
for (int i = 0; i < warmups; i++) {
performanceRun(i, queue,
false, producersCount, consumersCount, repetitions, initialValue);
}
final Long[] results = new Long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, producersCount, consumersCount, repetitions, initialValue);
}
// average best results for summary
List<Long> list = Arrays.asList(results);
Collections.sort(list);
long sum = 0;
// ignore the highest one
int limit = runs - 1;
for (int i = limit - bestRunsToAverage; i < limit; i++) {
sum += list.get(i);
}
long average = sum / bestRunsToAverage;
System.out.format("%s,%s %dP/%dC %,d\n", this.getClass().getSimpleName(), queue.getClass().getSimpleName(),
producersCount,
consumersCount, average);
}
/**
* Benchmarks how long it takes to push X number of items total. If there are is 1P and 1C, then X items will be sent from a producer to
* a consumer. Of there are NP and NC threads, then X/N (for a total of X) items will be sent.
*/
private
long performanceRun(final int runNumber,
final Queue<T> queue,
final boolean showStats,
final int producersCount,
final int consumersCount,
int repetitions,
T initialValue) throws Exception {
// this just measure how long it takes to count from 0 - repetitions
long start = System.nanoTime();
long end = -1;
for (int i=0;i<repetitions;i++) {
end += 1;
}
end = System.nanoTime();
long duration = end - start;
long ops = repetitions * 1000000000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
}

View File

@ -0,0 +1,25 @@
package dorkbox.util.messagebus.queuePerf;
@SuppressWarnings("Duplicates")
public class ConcurrentLinkedQueue extends Base_BlockingQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s: \n", REPETITIONS, ConcurrentLinkedQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.ConcurrentLinkedQueue queue = new java.util.concurrent.ConcurrentLinkedQueue();
final Integer initialValue = Integer.valueOf(777);
new CLQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
}
static
class CLQ_NonBlock extends Base_Queue<Integer> {}
}

View File

@ -0,0 +1,36 @@
package dorkbox.util.messagebus.queuePerf;
public class LinkedBlockingQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s\n", REPETITIONS, LinkedBlockingQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.LinkedBlockingQueue queue = new java.util.concurrent.LinkedBlockingQueue(1024);
final Integer initialValue = Integer.valueOf(777);
new LBQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
System.out.println("");
System.out.println("");
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.LinkedBlockingQueue queue = new java.util.concurrent.LinkedBlockingQueue(1024);
final Integer initialValue = Integer.valueOf(777);
new LBQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
}
static
class LBQ_Block extends Base_BlockingQueue<Integer> {}
static
class LBQ_NonBlock extends Base_Queue<Integer> {}
}

View File

@ -0,0 +1,24 @@
package dorkbox.util.messagebus.queuePerf;
public class LinkedTransferQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s\n", REPETITIONS, LinkedTransferQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.LinkedTransferQueue queue = new java.util.concurrent.LinkedTransferQueue();
final Integer initialValue = Integer.valueOf(777);
new LTQ().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
}
static
class LTQ extends Base_TransferQueue<Integer> {}
}

View File

@ -0,0 +1,25 @@
package dorkbox.util.messagebus.queuePerf;
@SuppressWarnings("Duplicates")
public class MpmcArrayQueue extends Base_BlockingQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s: \n", REPETITIONS, MpmcArrayQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final org.jctools.queues.MpmcArrayQueue queue = new org.jctools.queues.MpmcArrayQueue(1 << 17);
final Integer initialValue = Integer.valueOf(777);
new MpmcArray_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
}
static
class MpmcArray_NonBlock extends Base_Queue<Integer> {}
}

View File

@ -107,7 +107,6 @@ public class PerfTest_MpmcArrayQueue_Concurrent {
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
@ -130,7 +129,6 @@ public class PerfTest_MpmcArrayQueue_Concurrent {
do {
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);

View File

@ -0,0 +1,36 @@
package dorkbox.util.messagebus.queuePerf;
public class SynchronousQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
private static final int bestRunsToAverage = 4;
private static final int runs = 10;
private static final int warmups = 3;
public static void main(final String[] args) throws Exception {
System.out.format("reps: %,d %s\n", REPETITIONS, SynchronousQueue.class.getSimpleName());
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.SynchronousQueue queue = new java.util.concurrent.SynchronousQueue();
final Integer initialValue = Integer.valueOf(777);
new SQ_Block().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
System.out.println("");
System.out.println("");
for (int concurrency = 1; concurrency < 5; concurrency++) {
final java.util.concurrent.SynchronousQueue queue = new java.util.concurrent.SynchronousQueue();
final Integer initialValue = Integer.valueOf(777);
new SQ_NonBlock().run(REPETITIONS, concurrency, concurrency, warmups, runs, bestRunsToAverage, false, queue,
initialValue);
}
}
static
class SQ_Block extends Base_BlockingQueue<Integer> {}
static
class SQ_NonBlock extends Base_Queue<Integer> {}
}

View File

@ -0,0 +1,156 @@
package dorkbox.util.messagebus.queuePerf;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkProcessor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.LockSupport;
import static com.lmax.disruptor.RingBuffer.createMultiProducer;
/*
* Copyright 2011 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* <pre>
* Sequence a series of events from multiple publishers going to multiple work processors.
*
* +----+ +-----+
* | P1 |---+ +-->| WP1 |
* +----+ | +-----+ | +-----+
* +->| RB1 |-+
* +----+ | +-----+ | +-----+
* | P2 |---+ +-->| WP2 |
* +----+ +-----+
*
* P1 - Publisher 1
* P2 - Publisher 2
* RB - RingBuffer
* WP1 - EventProcessor 1
* WP2 - EventProcessor 2
* </pre>
*/
public final class TwoToTwoWorkProcessorThroughputTest extends AbstractPerfTestDisruptor
{
private static final int NUM_PUBLISHERS = 1;
private static final int BUFFER_SIZE = 1024 * 64;
private static final long ITERATIONS = 1000L * 1000L * 1L;
private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 2, DaemonThreadFactory.INSTANCE);
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
///////////////////////////////////////////////////////////////////////////////////////////////
private final RingBuffer<ValueEvent> ringBuffer =
createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final Sequence workSequence = new Sequence(-1);
private final ValueAdditionWorkHandler[] handlers = new ValueAdditionWorkHandler[1];
{
handlers[0] = new ValueAdditionWorkHandler();
// handlers[1] = new ValueAdditionWorkHandler();
}
@SuppressWarnings("unchecked")
private final WorkProcessor<ValueEvent>[] workProcessors = new WorkProcessor[1];
{
workProcessors[0] = new WorkProcessor<ValueEvent>(
ringBuffer, sequenceBarrier,
handlers[0], new IgnoreExceptionHandler(),
workSequence);
// workProcessors[1] = new WorkProcessor<ValueEvent>(
// ringBuffer, sequenceBarrier,
// handlers[1], new IgnoreExceptionHandler(),
// workSequence);
}
private final ValuePublisher[] valuePublishers = new ValuePublisher[NUM_PUBLISHERS];
{
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
valuePublishers[i] = new ValuePublisher(cyclicBarrier, ringBuffer, ITERATIONS);
}
ringBuffer.addGatingSequences(workProcessors[0].getSequence()
// , workProcessors[1].getSequence()
);
}
///////////////////////////////////////////////////////////////////////////////////////////////
@Override
protected int getRequiredProcessorCount()
{
return 4;
}
@Override
protected long runDisruptorPass() throws Exception
{
long expected = ringBuffer.getCursor() + (NUM_PUBLISHERS * ITERATIONS);
Future<?>[] futures = new Future[NUM_PUBLISHERS];
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
futures[i] = executor.submit(valuePublishers[i]);
}
for (WorkProcessor<ValueEvent> processor : workProcessors)
{
executor.submit(processor);
}
long start = System.currentTimeMillis();
cyclicBarrier.await();
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
futures[i].get();
}
while (workSequence.get() < expected)
{
LockSupport.parkNanos(1L);
}
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
Thread.sleep(1000);
for (WorkProcessor<ValueEvent> processor : workProcessors)
{
processor.halt();
}
return opsPerSecond;
}
public static void main(String[] args) throws Exception
{
new TwoToTwoWorkProcessorThroughputTest().testImplementations();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2012 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.queuePerf;
import com.lmax.disruptor.WorkHandler;
public class ValueAdditionWorkHandler implements WorkHandler<ValueEvent>
{
private long total;
@Override
public void onEvent(ValueEvent event) throws Exception
{
long value = event.getValue();
total += value;
}
public long getTotal()
{
return total;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2011 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.queuePerf;
import com.lmax.disruptor.EventFactory;
public final class ValueEvent
{
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2011 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.queuePerf;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.CyclicBarrier;
public final class ValuePublisher implements Runnable
{
private final CyclicBarrier cyclicBarrier;
private final RingBuffer<ValueEvent> ringBuffer;
private final long iterations;
public ValuePublisher(
final CyclicBarrier cyclicBarrier, final RingBuffer<ValueEvent> ringBuffer, final long iterations)
{
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.iterations = iterations;
}
@Override
public void run()
{
try
{
cyclicBarrier.await();
for (long i = 0; i < iterations; i++)
{
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(i);
ringBuffer.publish(sequence);
}
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
}
}