using disruptor + LTQ

This commit is contained in:
nathan 2015-02-13 11:02:43 +01:00
parent 3418b08529
commit 139f4c3069
4 changed files with 372 additions and 286 deletions

View File

@ -3,19 +3,32 @@ package net.engio.mbassy.multi;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.engio.mbassy.multi.common.BoundedTransferQueue;
import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.DisruptorThreadFactory;
import net.engio.mbassy.multi.common.InterruptRunnable;
import net.engio.mbassy.multi.common.LinkedTransferQueue; import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.Pow2;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.disruptor.DeadMessage; import net.engio.mbassy.multi.disruptor.DeadMessage;
import net.engio.mbassy.multi.disruptor.DispatchFactory;
import net.engio.mbassy.multi.disruptor.DispatchHolder;
import net.engio.mbassy.multi.disruptor.DispatchProcessor;
import net.engio.mbassy.multi.disruptor.MessageHolder; import net.engio.mbassy.multi.disruptor.MessageHolder;
import net.engio.mbassy.multi.disruptor.PublicationExceptionHandler;
import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager; import net.engio.mbassy.multi.subscription.SubscriptionManager;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import dorkbox.util.objectPool.PoolableObject; import dorkbox.util.objectPool.PoolableObject;
/** /**
@ -37,30 +50,31 @@ public class MultiMBassador implements IMessageBus {
// any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down
// private final ExecutorService dispatch_Executor; private final ExecutorService dispatch_Executor;
// private final ExecutorService invoke_Executor; private final ExecutorService invoke_Executor;
// private final TransferQueue<Object> dispatchQueue;
// private final Queue<MessageHolder> dispatchQueue; // private final Queue<MessageHolder> dispatchQueue;
// private final BlockingQueue<MessageHolder> dispatchQueue; // private final BlockingQueue<MessageHolder> dispatchQueue;
private final TransferQueue<Object> dispatchQueue;
// private final SynchronousQueue<MessageHolder> dispatchQueue; // private final SynchronousQueue<MessageHolder> dispatchQueue;
// private Queue<ObjectPoolHolder<MessageHolder>> mpmcArrayQueue; // private Queue<ObjectPoolHolder<MessageHolder>> mpmcArrayQueue;
// all threads that are available for asynchronous message dispatching // all threads that are available for asynchronous message dispatching
// private List<InterruptRunnable> invokeRunners; // private List<InterruptRunnable> invokeRunners;
private List<InterruptRunnable> dispatchRunners; // private List<InterruptRunnable> dispatchRunners;
// private dorkbox.util.objectPool.ObjectPool<MessageHolder> pool; // private dorkbox.util.objectPool.ObjectPool<MessageHolder> pool;
// must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads. // must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads.
// private final int dispatch_RingBufferSize = 4; private final int dispatch_RingBufferSize = 2;
// private final int invoke_RingBufferSize = 2048; // private final int invoke_RingBufferSize = 2048;
// private final Disruptor<DispatchHolder> dispatch_Disruptor; private final Disruptor<DispatchHolder> dispatch_Disruptor;
// private final RingBuffer<DispatchHolder> dispatch_RingBuffer; private final RingBuffer<DispatchHolder> dispatch_RingBuffer;
private final WorkerPool<DispatchHolder> dispatch_WorkerPool;
// private final Disruptor<MessageHolder> invoke_Disruptor; // private final Disruptor<MessageHolder> invoke_Disruptor;
// private final RingBuffer<MessageHolder> invoke_RingBuffer; // private final RingBuffer<MessageHolder> invoke_RingBuffer;
@ -76,9 +90,16 @@ public class MultiMBassador implements IMessageBus {
} }
public MultiMBassador() { public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()*2); this(Runtime.getRuntime().availableProcessors());
} }
private final ThreadLocal<DeadMessage> deadMessageCache = new ThreadLocal<DeadMessage>() {
@Override
protected DeadMessage initialValue()
{
return new DeadMessage(null);
}
};
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) { if (numberOfThreads < 1) {
@ -86,7 +107,7 @@ public class MultiMBassador implements IMessageBus {
} }
// this.objectQueue = new LinkedTransferQueue<MessageHolder>(); // this.objectQueue = new LinkedTransferQueue<MessageHolder>();
this.dispatchQueue = new LinkedTransferQueue<Object>(); // this.dispatchQueue = new LinkedTransferQueue<Object>();
// this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads); // this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads);
// this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2));
// this.dispatchQueue = new PTLQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new PTLQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2));
@ -95,13 +116,14 @@ public class MultiMBassador implements IMessageBus {
// this.dispatchQueue = new LinkedBlockingQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads)); // this.dispatchQueue = new LinkedBlockingQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads));
// this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES, int dispatchSize = 2;
// new SynchronousQueue<Runnable>(), this.dispatch_Executor = new ThreadPoolExecutor(dispatchSize, dispatchSize, 1L, TimeUnit.MINUTES,
// new DisruptorThreadFactory("MB_Dispatch")); new LinkedTransferQueue<Runnable>(),
new DisruptorThreadFactory("MB_Dispatch"));
// this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES,
// new SynchronousQueue<Runnable>(), new BoundedTransferQueue<Runnable>(numberOfThreads),
// new DisruptorThreadFactory("MB_Invoke")); new DisruptorThreadFactory("MB_Invoke"));
@ -111,104 +133,117 @@ public class MultiMBassador implements IMessageBus {
int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); // int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2);
this.dispatchRunners = new ArrayList<InterruptRunnable>(dispatchSize); // this.dispatchRunners = new ArrayList<InterruptRunnable>(dispatchSize);
DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); // DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch");
for (int i = 0; i < dispatchSize; i++) { // for (int i = 0; i < dispatchSize; i++) {
// each thread will run forever and process incoming message publication requests // // each thread will run forever and process incoming message publication requests
InterruptRunnable runnable = new InterruptRunnable() { // InterruptRunnable runnable = new InterruptRunnable() {
private final ThreadLocal<DeadMessage> deadMessageCache = new ThreadLocal<DeadMessage>() { // private final ThreadLocal<DeadMessage> deadMessageCache = new ThreadLocal<DeadMessage>() {
@Override // @Override
protected DeadMessage initialValue() // protected DeadMessage initialValue()
{ // {
return new DeadMessage(null); // return new DeadMessage(null);
} // }
}; // };
//
//
@Override // @Override
public void run() { // public void run() {
final MultiMBassador mbassador = MultiMBassador.this; // final MultiMBassador mbassador = MultiMBassador.this;
SubscriptionManager manager = mbassador.subscriptionManager; // SubscriptionManager manager = mbassador.subscriptionManager;
final TransferQueue<Object> IN_queue = mbassador.dispatchQueue; // final TransferQueue<Object> IN_queue = mbassador.dispatchQueue;
// final Queue<MessageHolder> OUT_queue = mbassador.invokeQueue; //// final Queue<MessageHolder> OUT_queue = mbassador.invokeQueue;
//
Object message = null; // Object message = null;
// int counter = 200; //// int counter = 200;
try { // try {
while (this.running) { // while (this.running) {
message = IN_queue.take(); // message = IN_queue.take();
// value = IN_queue.poll(); //// value = IN_queue.poll();
// if (value != null) { //// if (value != null) {
Class<?> messageClass = message.getClass(); // Class<?> messageClass = message.getClass();
//
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); // Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
//
try { // try {
boolean empty = subscriptions.isEmpty(); // boolean empty = subscriptions.isEmpty();
if (empty) { // if (empty) {
//
// Dead Event // // Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); // subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
//
DeadMessage deadMessage = this.deadMessageCache.get(); // DeadMessage deadMessage = this.deadMessageCache.get();
deadMessage.relatedMessages[0] = message; // deadMessage.relatedMessages[0] = message;
message = deadMessage; // message = deadMessage;
empty = subscriptions.isEmpty(); // empty = subscriptions.isEmpty();
} // }
//
if (!empty) { // if (!empty) {
for (Subscription sub : subscriptions) { // for (Subscription sub : subscriptions) {
// boolean handled = false; //// boolean handled = false;
// if (sub.isVarArg()) { //// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method //// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) { //// if (vararg == null) {
// // messy, but the ONLY way to do it. //// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1); //// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message; //// vararg[0] = message;
// // //
// Object[] newInstance = new Object[1]; //// Object[] newInstance = new Object[1];
// newInstance[0] = vararg; //// newInstance[0] = vararg;
// vararg = newInstance; //// vararg = newInstance;
// } //// }
// // //
// handled = true; //// handled = true;
// sub.publishToSubscription(mbassador, vararg); //// sub.publishToSubscription(mbassador, vararg);
// } //// }
// // //
// if (!handled) { //// if (!handled) {
sub.publishToSubscription(mbassador, message); // sub.publishToSubscription(mbassador, message);
// } //// }
} // }
} // }
// counter = 200; //// counter = 200;
} catch (Throwable e) { // } catch (Throwable e) {
mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); // mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
}
// } else {
// if (counter > 100) {
// --counter;
// } else if (counter > 0) {
// --counter;
// Thread.yield();
// } else {
// LockSupport.parkNanos(1L);
// } // }
// } //// } else {
} //// if (counter > 100) {
} catch (InterruptedException e) { //// --counter;
Thread.currentThread().interrupt(); //// } else if (counter > 0) {
return; //// --counter;
} //// Thread.yield();
} //// } else {
}; //// LockSupport.parkNanos(1L);
//// }
Thread runner = dispatchThreadFactory.newThread(runnable); //// }
this.dispatchRunners.add(runnable); // }
runner.start(); // } catch (InterruptedException e) {
} // Thread.currentThread().interrupt();
// return;
// }
// }
// };
//
// Thread runner = dispatchThreadFactory.newThread(runnable);
// this.dispatchRunners.add(runnable);
// runner.start();
// }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
// this.invokeRunners = new ArrayList<InterruptRunnable>(numberOfThreads*2);
// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke");
// for (int i = 0; i < numberOfThreads; i++) {
// // each thread will run forever and process incoming message publication requests
// InterruptRunnable runnable = new InterruptRunnable() {
// @Override
// public void run() {
//
// }
// };
// }
// this.invokeRunners = new ArrayList<InterruptRunnable>(numberOfThreads*2); // this.invokeRunners = new ArrayList<InterruptRunnable>(numberOfThreads*2);
// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); // DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke");
@ -310,22 +345,22 @@ public class MultiMBassador implements IMessageBus {
//////////////////////////// ////////////////////////////
// PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this);
//
// this.dispatch_Disruptor = new Disruptor<DispatchHolder>(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, this.dispatch_Disruptor = new Disruptor<DispatchHolder>(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor,
// ProducerType.MULTI, new SleepingWaitStrategy()); ProducerType.MULTI, new SleepingWaitStrategy());
// this.invoke_Disruptor = new Disruptor<MessageHolder>(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor, // this.invoke_Disruptor = new Disruptor<MessageHolder>(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor,
// ProducerType.MULTI, new SleepingWaitStrategy()); // ProducerType.MULTI, new SleepingWaitStrategy());
//
//
// this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer();
// this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer(); // this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer();
//
// // not too many handlers, so we don't contend the locks in the subscription manager // not too many handlers, so we don't contend the locks in the subscription manager
// EventHandler<DispatchHolder> dispatchHandlers[] = new DispatchProcessor[4]; WorkHandler<DispatchHolder> dispatchHandlers[] = new DispatchProcessor[dispatchSize];
// for (int i = 0; i < dispatchHandlers.length; i++) { for (int i = 0; i < dispatchHandlers.length; i++) {
// dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue); dispatchHandlers[i] = new DispatchProcessor(i, dispatchHandlers.length, this.invoke_Executor);
// } }
// WorkHandler<MessageHolder> invokeHandlers[] = new InvokeProcessor[numberOfThreads]; // WorkHandler<MessageHolder> invokeHandlers[] = new InvokeProcessor[numberOfThreads];
// for (int i = 0; i < invokeHandlers.length; i++) { // for (int i = 0; i < invokeHandlers.length; i++) {
@ -333,12 +368,12 @@ public class MultiMBassador implements IMessageBus {
// } // }
// //
// this.dispatch_Disruptor.handleEventsWith(dispatchHandlers); // this.dispatch_Disruptor.handleEventsWith(dispatchHandlers);
// this.invoke_WorkerPool = new WorkerPool<MessageHolder>(this.invoke_RingBuffer, this.dispatch_WorkerPool = new WorkerPool<DispatchHolder>(this.dispatch_RingBuffer,
// this.invoke_RingBuffer.newBarrier(), this.dispatch_RingBuffer.newBarrier(),
// loggingExceptionHandler, loggingExceptionHandler,
// invokeHandlers); dispatchHandlers);
// //
// this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences()); this.dispatch_RingBuffer.addGatingSequences(this.dispatch_WorkerPool.getWorkerSequences());
///////////////////////////////// /////////////////////////////////
// //
@ -422,9 +457,9 @@ public class MultiMBassador implements IMessageBus {
} }
public final MultiMBassador start() { public final MultiMBassador start() {
// this.invoke_WorkerPool.start(this.invoke_Executor); this.dispatch_WorkerPool.start(this.dispatch_Executor);
// this.invoke_Disruptor.start(); // this.invoke_Disruptor.start();
// this.dispatch_Disruptor.start(); this.dispatch_Disruptor.start();
return this; return this;
} }
@ -456,22 +491,24 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public boolean hasPendingMessages() { public boolean hasPendingMessages() {
return this.pendingMessages.get() > 0L;
// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; // return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize;
return !this.dispatchQueue.isEmpty(); // return !this.dispatchQueue.isEmpty();
} }
@Override @Override
public void shutdown() { public void shutdown() {
for (InterruptRunnable runnable : this.dispatchRunners) { // for (InterruptRunnable runnable : this.dispatchRunners) {
runnable.stop(); // runnable.stop();
} // }
// for (InterruptRunnable runnable : this.invokeRunners) { // for (InterruptRunnable runnable : this.invokeRunners) {
// runnable.stop(); // runnable.stop();
// } // }
// this.dispatch_Disruptor.shutdown(); this.dispatch_Disruptor.shutdown();
// this.dispatch_Executor.shutdown(); this.dispatch_Executor.shutdown();
this.invoke_Executor.shutdown();
} }
@ -748,65 +785,113 @@ public class MultiMBassador implements IMessageBus {
// } // }
} }
private final AtomicLong pendingMessages = new AtomicLong(0);
@Override @Override
public void publishAsync(Object message) { public void publishAsync(final Object message) {
if (message != null) { if (message != null) {
// // put this on the disruptor ring buffer // put this on the disruptor ring buffer
// final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer; final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
Runnable runnable = new Runnable() {
@Override
public void run() {
// System.err.println("invoke");
Object localMessage = message;
Class<?> messageClass = localMessage.getClass();
SubscriptionManager subscriptionManager = MultiMBassador.this.subscriptionManager;
Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = MultiMBassador.this.deadMessageCache.get();
localMessage = new DeadMessage(message);
empty = subscriptions.isEmpty();
}
if (!empty) {
for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
// //
// // setup the job // Object[] newInstance = new Object[1];
// final long seq = ringBuffer.next(); // newInstance[0] = vararg;
// try { // vararg = newInstance;
// DispatchHolder eventJob = ringBuffer.get(seq); // }
// eventJob.messageType = MessageType.ONE; //
// eventJob.message1 = message; // handled = true;
// } catch (Throwable e) { // sub.publishToSubscription(mbassador, vararg);
// handlePublicationError(new PublicationError() // }
// .setMessage("Error while adding an asynchronous message") //
// .setCause(e) // if (!handled) {
// .setPublishedObject(message)); sub.publishToSubscription(MultiMBassador.this, localMessage);
// } finally { // }
// // always publish the job }
// ringBuffer.publish(seq); }
// }
MultiMBassador.this.pendingMessages.getAndDecrement();
}
};
DispatchHolder eventJob = ringBuffer.get(seq);
eventJob.runnable = runnable;
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
this.pendingMessages.getAndIncrement();
// System.err.println("adding " + this.pendingMessages.getAndIncrement());
// MessageHolder messageHolder = new MessageHolder(); // MessageHolder messageHolder = new MessageHolder();
// messageHolder.messageType = MessageType.ONE; // messageHolder.messageType = MessageType.ONE;
// messageHolder.message1 = message; // messageHolder.message1 = message;
// new Runnable() {
// @Override // try {
// public void run() { // this.dispatchQueue.transfer(message);
// //
// } //// int counter = 200;
// }; //// while (!this.dispatchQueue.offer(messageHolder)) {
//// if (counter > 100) {
try { //// --counter;
this.dispatchQueue.transfer(message); //// } else if (counter > 0) {
//// --counter;
// int counter = 200; //// Thread.yield();
// while (!this.dispatchQueue.offer(messageHolder)) { //// } else {
// if (counter > 100) { //// LockSupport.parkNanos(1L);
// --counter; //// }
// } else if (counter > 0) { //// }
// --counter; //
// Thread.yield(); //
// } else { // } catch (InterruptedException e) {
// LockSupport.parkNanos(1L); // e.printStackTrace();
// } // // log.error(e);
//
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// } // }
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
} }
} }

View File

@ -42,10 +42,20 @@ public final class BoundedTransferQueue<E> extends AbstractQueue<E> implements T
@Override @Override
public boolean offer(E e) { public boolean offer(E e) {
if(tryDecrementCapacity()) { // if(tryDecrementCapacity()) {
return this._queue.offer(e); // return this._queue.offer(e);
} // }
return false; // return false;
try {
if (tryDecrementCapacity()) {
this._queue.put(e);
} else {
this._queue.transfer(e);
this._remainingCapacity.decrementAndGet();
}
} catch (InterruptedException e2) {}
return true;
} }
@Override @Override

View File

@ -6,12 +6,8 @@ package net.engio.mbassy.multi.disruptor;
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class DispatchHolder { public class DispatchHolder {
public MessageType messageType = MessageType.ONE;
public Object message1 = null; public Runnable runnable = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public DispatchHolder() { public DispatchHolder() {
} }

View File

@ -1,46 +1,41 @@
package net.engio.mbassy.multi.disruptor; package net.engio.mbassy.multi.disruptor;
import java.util.Collection; import java.util.concurrent.ExecutorService;
import java.util.Queue;
import net.engio.mbassy.multi.MultiMBassador; import com.lmax.disruptor.WorkHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import com.lmax.disruptor.EventHandler;
/** /**
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
*/ */
public class DispatchProcessor implements EventHandler<DispatchHolder> { public class DispatchProcessor implements WorkHandler<DispatchHolder> {
private final MultiMBassador publisher;
private final long ordinal; private final long ordinal;
private final long numberOfConsumers; private final long numberOfConsumers;
private final SubscriptionManager subscriptionManager; private final ExecutorService invoke_Executor;
// private final RingBuffer<MessageHolder> invoke_RingBuffer;
private final Queue<MessageHolder> queue;
public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers,
final SubscriptionManager subscriptionManager, Queue<MessageHolder> queue) {
this.publisher = publisher;
public DispatchProcessor(final long ordinal, final long numberOfConsumers,
final ExecutorService invoke_Executor) {
this.ordinal = ordinal; this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers; this.numberOfConsumers = numberOfConsumers;
this.subscriptionManager = subscriptionManager; this.invoke_Executor = invoke_Executor;
this.queue = queue;
} }
@Override @Override
public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception { public void onEvent(DispatchHolder event) throws Exception {
if (sequence % this.numberOfConsumers == this.ordinal) { // if (sequence % this.numberOfConsumers == this.ordinal) {
// System.err.println("handoff -" + this.ordinal);
this.invoke_Executor.submit(event.runnable);
// event.runnable.run();
// Process the event // Process the event
// switch (event.messageType) { // switch (event.messageType) {
// case ONE: { // case ONE: {
publish(event.message1); // publish(event.message1);
event.message1 = null; // cleanup // event.message1 = null; // cleanup
// return; // return;
// } // }
// case TWO: { // case TWO: {
@ -63,75 +58,75 @@ public class DispatchProcessor implements EventHandler<DispatchHolder> {
// } // }
// } // }
} // }
} }
private void publish(Object message) { // private void publish(Object message) {
Class<?> messageClass = message.getClass(); // Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
try {
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
message = new DeadMessage(message);
empty = subscriptions.isEmpty();
}
if (!empty) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.invoke_RingBuffer;
// //
// // setup the job // SubscriptionManager manager = this.subscriptionManager;
// final long seq = ringBuffer.next(); // Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
// try { //
// MessageHolder eventJob = ringBuffer.get(seq); // try {
// eventJob.messageType = MessageType.ONE; // boolean empty = subscriptions.isEmpty();
// eventJob.message1 = message; // if (empty) {
// eventJob.subscriptions = subscriptions; // // Dead Event
// } catch (Throwable e) { // subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// this.publisher.handlePublicationError(new PublicationError() //
// .setMessage("Error while adding an asynchronous message") // message = new DeadMessage(message);
// .setCause(e) //
// .setPublishedObject(message)); // empty = subscriptions.isEmpty();
// } finally { // }
// // always publish the job //
// ringBuffer.publish(seq); // if (!empty) {
// } //// // put this on the disruptor ring buffer
//// final RingBuffer<MessageHolder> ringBuffer = this.invoke_RingBuffer;
////
//// // setup the job
// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are //// final long seq = ringBuffer.next();
// ObjectPoolHolder<MessageHolder> messageHolder = this.pool.take(); //// try {
// MessageHolder value = messageHolder.getValue(); //// MessageHolder eventJob = ringBuffer.get(seq);
MessageHolder messageHolder = new MessageHolder(); //// eventJob.messageType = MessageType.ONE;
messageHolder.subscriptions= subscriptions; //// eventJob.message1 = message;
messageHolder.messageType = MessageType.ONE; //// eventJob.subscriptions = subscriptions;
messageHolder.message1 = message; //// } catch (Throwable e) {
//// this.publisher.handlePublicationError(new PublicationError()
// this.queue.put(messageHolder); //// .setMessage("Error while adding an asynchronous message")
//// .setCause(e)
// int counter = 200; //// .setPublishedObject(message));
// while (!this.queue.offer(messageHolder)) { //// } finally {
// if (counter > 100) { //// // always publish the job
// --counter; //// ringBuffer.publish(seq);
// } else if (counter > 0) { //// }
// --counter; //
// Thread.yield(); //
// } else { //
// LockSupport.parkNanos(1L); //// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are
// } //// ObjectPoolHolder<MessageHolder> messageHolder = this.pool.take();
// } //// MessageHolder value = messageHolder.getValue();
} // MessageHolder messageHolder = new MessageHolder();
} catch (Throwable e) { // messageHolder.subscriptions= subscriptions;
this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) // messageHolder.messageType = MessageType.ONE;
.setPublishedObject(message)); // messageHolder.message1 = message;
} //
} //// this.queue.put(messageHolder);
//
//// int counter = 200;
//// while (!this.queue.offer(messageHolder)) {
//// if (counter > 100) {
//// --counter;
//// } else if (counter > 0) {
//// --counter;
//// Thread.yield();
//// } else {
//// LockSupport.parkNanos(1L);
//// }
//// }
// }
// } catch (Throwable e) {
// this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e)
// .setPublishedObject(message));
// }
// }
} }