diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index aa07500..c9185e6 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -3,19 +3,32 @@ package net.engio.mbassy.multi; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import net.engio.mbassy.multi.common.BoundedTransferQueue; import net.engio.mbassy.multi.common.DisruptorThreadFactory; -import net.engio.mbassy.multi.common.InterruptRunnable; import net.engio.mbassy.multi.common.LinkedTransferQueue; -import net.engio.mbassy.multi.common.Pow2; -import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.disruptor.DeadMessage; +import net.engio.mbassy.multi.disruptor.DispatchFactory; +import net.engio.mbassy.multi.disruptor.DispatchHolder; +import net.engio.mbassy.multi.disruptor.DispatchProcessor; import net.engio.mbassy.multi.disruptor.MessageHolder; +import net.engio.mbassy.multi.disruptor.PublicationExceptionHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.SubscriptionManager; + +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.WorkHandler; +import com.lmax.disruptor.WorkerPool; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + import dorkbox.util.objectPool.PoolableObject; /** @@ -37,30 +50,31 @@ public class MultiMBassador implements IMessageBus { // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down -// private final ExecutorService dispatch_Executor; -// private final ExecutorService invoke_Executor; + private final ExecutorService dispatch_Executor; + private final ExecutorService invoke_Executor; +// private final TransferQueue dispatchQueue; // private final Queue dispatchQueue; // private final BlockingQueue dispatchQueue; - private final TransferQueue dispatchQueue; // private final SynchronousQueue dispatchQueue; // private Queue> mpmcArrayQueue; // all threads that are available for asynchronous message dispatching // private List invokeRunners; - private List dispatchRunners; +// private List dispatchRunners; // private dorkbox.util.objectPool.ObjectPool pool; // must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads. -// private final int dispatch_RingBufferSize = 4; + private final int dispatch_RingBufferSize = 2; // private final int invoke_RingBufferSize = 2048; -// private final Disruptor dispatch_Disruptor; -// private final RingBuffer dispatch_RingBuffer; + private final Disruptor dispatch_Disruptor; + private final RingBuffer dispatch_RingBuffer; + private final WorkerPool dispatch_WorkerPool; // private final Disruptor invoke_Disruptor; // private final RingBuffer invoke_RingBuffer; @@ -76,9 +90,16 @@ public class MultiMBassador implements IMessageBus { } public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()*2); + this(Runtime.getRuntime().availableProcessors()); } + private final ThreadLocal deadMessageCache = new ThreadLocal() { + @Override + protected DeadMessage initialValue() + { + return new DeadMessage(null); + } + }; public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { @@ -86,7 +107,7 @@ public class MultiMBassador implements IMessageBus { } // this.objectQueue = new LinkedTransferQueue(); - this.dispatchQueue = new LinkedTransferQueue(); +// this.dispatchQueue = new LinkedTransferQueue(); // this.dispatchQueue = new BoundedTransferQueue(numberOfThreads); // this.dispatchQueue = new MpmcArrayQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new PTLQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); @@ -95,13 +116,14 @@ public class MultiMBassador implements IMessageBus { // this.dispatchQueue = new LinkedBlockingQueue(Pow2.roundToPowerOfTwo(numberOfThreads)); -// this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES, -// new SynchronousQueue(), -// new DisruptorThreadFactory("MB_Dispatch")); + int dispatchSize = 2; + this.dispatch_Executor = new ThreadPoolExecutor(dispatchSize, dispatchSize, 1L, TimeUnit.MINUTES, + new LinkedTransferQueue(), + new DisruptorThreadFactory("MB_Dispatch")); -// this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, -// new SynchronousQueue(), -// new DisruptorThreadFactory("MB_Invoke")); + this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, + new BoundedTransferQueue(numberOfThreads), + new DisruptorThreadFactory("MB_Invoke")); @@ -111,104 +133,117 @@ public class MultiMBassador implements IMessageBus { - int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); - this.dispatchRunners = new ArrayList(dispatchSize); - DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); - for (int i = 0; i < dispatchSize; i++) { - // each thread will run forever and process incoming message publication requests - InterruptRunnable runnable = new InterruptRunnable() { - private final ThreadLocal deadMessageCache = new ThreadLocal() { - @Override - protected DeadMessage initialValue() - { - return new DeadMessage(null); - } - }; - - - @Override - public void run() { - final MultiMBassador mbassador = MultiMBassador.this; - SubscriptionManager manager = mbassador.subscriptionManager; - final TransferQueue IN_queue = mbassador.dispatchQueue; -// final Queue OUT_queue = mbassador.invokeQueue; - - Object message = null; -// int counter = 200; - try { - while (this.running) { - message = IN_queue.take(); -// value = IN_queue.poll(); -// if (value != null) { - Class messageClass = message.getClass(); - - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - - try { - boolean empty = subscriptions.isEmpty(); - if (empty) { - - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - - DeadMessage deadMessage = this.deadMessageCache.get(); - deadMessage.relatedMessages[0] = message; - message = deadMessage; - empty = subscriptions.isEmpty(); - } - - if (!empty) { - for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; - // -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } - // -// handled = true; -// sub.publishToSubscription(mbassador, vararg); -// } - // -// if (!handled) { - sub.publishToSubscription(mbassador, message); -// } - } - } -// counter = 200; - } catch (Throwable e) { - mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); - } -// } else { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); +// int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); +// this.dispatchRunners = new ArrayList(dispatchSize); +// DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); +// for (int i = 0; i < dispatchSize; i++) { +// // each thread will run forever and process incoming message publication requests +// InterruptRunnable runnable = new InterruptRunnable() { +// private final ThreadLocal deadMessageCache = new ThreadLocal() { +// @Override +// protected DeadMessage initialValue() +// { +// return new DeadMessage(null); +// } +// }; +// +// +// @Override +// public void run() { +// final MultiMBassador mbassador = MultiMBassador.this; +// SubscriptionManager manager = mbassador.subscriptionManager; +// final TransferQueue IN_queue = mbassador.dispatchQueue; +//// final Queue OUT_queue = mbassador.invokeQueue; +// +// Object message = null; +//// int counter = 200; +// try { +// while (this.running) { +// message = IN_queue.take(); +//// value = IN_queue.poll(); +//// if (value != null) { +// Class messageClass = message.getClass(); +// +// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); +// +// try { +// boolean empty = subscriptions.isEmpty(); +// if (empty) { +// +// // Dead Event +// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); +// +// DeadMessage deadMessage = this.deadMessageCache.get(); +// deadMessage.relatedMessages[0] = message; +// message = deadMessage; +// empty = subscriptions.isEmpty(); +// } +// +// if (!empty) { +// for (Subscription sub : subscriptions) { +//// boolean handled = false; +//// if (sub.isVarArg()) { +//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method +//// if (vararg == null) { +//// // messy, but the ONLY way to do it. +//// vararg = (Object[]) Array.newInstance(messageClass, 1); +//// vararg[0] = message; +// // +//// Object[] newInstance = new Object[1]; +//// newInstance[0] = vararg; +//// vararg = newInstance; +//// } +// // +//// handled = true; +//// sub.publishToSubscription(mbassador, vararg); +//// } +// // +//// if (!handled) { +// sub.publishToSubscription(mbassador, message); +//// } +// } +// } +//// counter = 200; +// } catch (Throwable e) { +// mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); // } -// } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - }; - - Thread runner = dispatchThreadFactory.newThread(runnable); - this.dispatchRunners.add(runnable); - runner.start(); - } +//// } else { +//// if (counter > 100) { +//// --counter; +//// } else if (counter > 0) { +//// --counter; +//// Thread.yield(); +//// } else { +//// LockSupport.parkNanos(1L); +//// } +//// } +// } +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// return; +// } +// } +// }; +// +// Thread runner = dispatchThreadFactory.newThread(runnable); +// this.dispatchRunners.add(runnable); +// runner.start(); +// } ////////////////////////////////////////////////////// +// this.invokeRunners = new ArrayList(numberOfThreads*2); +// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); +// for (int i = 0; i < numberOfThreads; i++) { +// // each thread will run forever and process incoming message publication requests +// InterruptRunnable runnable = new InterruptRunnable() { +// @Override +// public void run() { +// +// } +// }; +// } + + // this.invokeRunners = new ArrayList(numberOfThreads*2); // DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); @@ -310,22 +345,22 @@ public class MultiMBassador implements IMessageBus { //////////////////////////// -// PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); -// -// this.dispatch_Disruptor = new Disruptor(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, -// ProducerType.MULTI, new SleepingWaitStrategy()); + PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); + + this.dispatch_Disruptor = new Disruptor(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, + ProducerType.MULTI, new SleepingWaitStrategy()); // this.invoke_Disruptor = new Disruptor(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor, // ProducerType.MULTI, new SleepingWaitStrategy()); -// -// -// this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); + + + this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); // this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer(); -// -// // not too many handlers, so we don't contend the locks in the subscription manager -// EventHandler dispatchHandlers[] = new DispatchProcessor[4]; -// for (int i = 0; i < dispatchHandlers.length; i++) { -// dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue); -// } + + // not too many handlers, so we don't contend the locks in the subscription manager + WorkHandler dispatchHandlers[] = new DispatchProcessor[dispatchSize]; + for (int i = 0; i < dispatchHandlers.length; i++) { + dispatchHandlers[i] = new DispatchProcessor(i, dispatchHandlers.length, this.invoke_Executor); + } // WorkHandler invokeHandlers[] = new InvokeProcessor[numberOfThreads]; // for (int i = 0; i < invokeHandlers.length; i++) { @@ -333,12 +368,12 @@ public class MultiMBassador implements IMessageBus { // } // // this.dispatch_Disruptor.handleEventsWith(dispatchHandlers); -// this.invoke_WorkerPool = new WorkerPool(this.invoke_RingBuffer, -// this.invoke_RingBuffer.newBarrier(), -// loggingExceptionHandler, -// invokeHandlers); + this.dispatch_WorkerPool = new WorkerPool(this.dispatch_RingBuffer, + this.dispatch_RingBuffer.newBarrier(), + loggingExceptionHandler, + dispatchHandlers); // -// this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences()); + this.dispatch_RingBuffer.addGatingSequences(this.dispatch_WorkerPool.getWorkerSequences()); ///////////////////////////////// // @@ -422,9 +457,9 @@ public class MultiMBassador implements IMessageBus { } public final MultiMBassador start() { -// this.invoke_WorkerPool.start(this.invoke_Executor); + this.dispatch_WorkerPool.start(this.dispatch_Executor); // this.invoke_Disruptor.start(); -// this.dispatch_Disruptor.start(); + this.dispatch_Disruptor.start(); return this; } @@ -456,22 +491,24 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { + return this.pendingMessages.get() > 0L; // return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; - return !this.dispatchQueue.isEmpty(); +// return !this.dispatchQueue.isEmpty(); } @Override public void shutdown() { - for (InterruptRunnable runnable : this.dispatchRunners) { - runnable.stop(); - } +// for (InterruptRunnable runnable : this.dispatchRunners) { +// runnable.stop(); +// } // for (InterruptRunnable runnable : this.invokeRunners) { // runnable.stop(); // } -// this.dispatch_Disruptor.shutdown(); -// this.dispatch_Executor.shutdown(); + this.dispatch_Disruptor.shutdown(); + this.dispatch_Executor.shutdown(); + this.invoke_Executor.shutdown(); } @@ -748,65 +785,113 @@ public class MultiMBassador implements IMessageBus { // } } + private final AtomicLong pendingMessages = new AtomicLong(0); + @Override - public void publishAsync(Object message) { + public void publishAsync(final Object message) { if (message != null) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.dispatch_RingBuffer; + // put this on the disruptor ring buffer + final RingBuffer ringBuffer = this.dispatch_RingBuffer; + + // setup the job + final long seq = ringBuffer.next(); + try { + Runnable runnable = new Runnable() { + @Override + public void run() { +// System.err.println("invoke"); + Object localMessage = message; + + Class messageClass = localMessage.getClass(); + + SubscriptionManager subscriptionManager = MultiMBassador.this.subscriptionManager; + Collection subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass); + + boolean empty = subscriptions.isEmpty(); + if (empty) { + // Dead Event + subscriptions = subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class); + +// DeadMessage deadMessage = MultiMBassador.this.deadMessageCache.get(); + localMessage = new DeadMessage(message); + empty = subscriptions.isEmpty(); + } + + if (!empty) { + for (Subscription sub : subscriptions) { +// boolean handled = false; +// if (sub.isVarArg()) { +// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method +// if (vararg == null) { +// // messy, but the ONLY way to do it. +// vararg = (Object[]) Array.newInstance(messageClass, 1); +// vararg[0] = message; // -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// DispatchHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } +// Object[] newInstance = new Object[1]; +// newInstance[0] = vararg; +// vararg = newInstance; +// } +// +// handled = true; +// sub.publishToSubscription(mbassador, vararg); +// } +// +// if (!handled) { + sub.publishToSubscription(MultiMBassador.this, localMessage); +// } + } + } + + MultiMBassador.this.pendingMessages.getAndDecrement(); + } + }; + + + DispatchHolder eventJob = ringBuffer.get(seq); + eventJob.runnable = runnable; + } catch (Throwable e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } finally { + // always publish the job + ringBuffer.publish(seq); + } + this.pendingMessages.getAndIncrement(); +// System.err.println("adding " + this.pendingMessages.getAndIncrement()); // MessageHolder messageHolder = new MessageHolder(); // messageHolder.messageType = MessageType.ONE; // messageHolder.message1 = message; -// new Runnable() { -// @Override -// public void run() { + +// try { +// this.dispatchQueue.transfer(message); // -// } -// }; - - try { - this.dispatchQueue.transfer(message); - -// int counter = 200; -// while (!this.dispatchQueue.offer(messageHolder)) { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); -// } +//// int counter = 200; +//// while (!this.dispatchQueue.offer(messageHolder)) { +//// if (counter > 100) { +//// --counter; +//// } else if (counter > 0) { +//// --counter; +//// Thread.yield(); +//// } else { +//// LockSupport.parkNanos(1L); +//// } +//// } +// +// +// } catch (InterruptedException e) { +// e.printStackTrace(); +// // log.error(e); +// +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); // } - - - } catch (InterruptedException e) { - e.printStackTrace(); - // log.error(e); - - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } } } diff --git a/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java b/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java index af13855..e1f21c2 100644 --- a/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java +++ b/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java @@ -42,10 +42,20 @@ public final class BoundedTransferQueue extends AbstractQueue implements T @Override public boolean offer(E e) { - if(tryDecrementCapacity()) { - return this._queue.offer(e); - } - return false; +// if(tryDecrementCapacity()) { +// return this._queue.offer(e); +// } +// return false; + + try { + if (tryDecrementCapacity()) { + this._queue.put(e); + } else { + this._queue.transfer(e); + this._remainingCapacity.decrementAndGet(); + } + } catch (InterruptedException e2) {} + return true; } @Override diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java index 6f9c82a..a8cb540 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java +++ b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java @@ -6,12 +6,8 @@ package net.engio.mbassy.multi.disruptor; * Date: 2/2/15 */ public class DispatchHolder { - public MessageType messageType = MessageType.ONE; - public Object message1 = null; - public Object message2 = null; - public Object message3 = null; - public Object[] messages = null; + public Runnable runnable = null; public DispatchHolder() { } diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java index 85e0442..c486e4e 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java +++ b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java @@ -1,46 +1,41 @@ package net.engio.mbassy.multi.disruptor; -import java.util.Collection; -import java.util.Queue; +import java.util.concurrent.ExecutorService; -import net.engio.mbassy.multi.MultiMBassador; -import net.engio.mbassy.multi.error.PublicationError; -import net.engio.mbassy.multi.subscription.Subscription; -import net.engio.mbassy.multi.subscription.SubscriptionManager; - -import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.WorkHandler; /** * @author dorkbox, llc Date: 2/2/15 */ -public class DispatchProcessor implements EventHandler { - private final MultiMBassador publisher; +public class DispatchProcessor implements WorkHandler { private final long ordinal; private final long numberOfConsumers; - private final SubscriptionManager subscriptionManager; -// private final RingBuffer invoke_RingBuffer; - private final Queue queue; - - public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers, - final SubscriptionManager subscriptionManager, Queue queue) { - this.publisher = publisher; + private final ExecutorService invoke_Executor; + public DispatchProcessor(final long ordinal, final long numberOfConsumers, + final ExecutorService invoke_Executor) { this.ordinal = ordinal; this.numberOfConsumers = numberOfConsumers; - this.subscriptionManager = subscriptionManager; - this.queue = queue; + this.invoke_Executor = invoke_Executor; } + + @Override - public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception { - if (sequence % this.numberOfConsumers == this.ordinal) { + public void onEvent(DispatchHolder event) throws Exception { +// if (sequence % this.numberOfConsumers == this.ordinal) { +// System.err.println("handoff -" + this.ordinal); + + this.invoke_Executor.submit(event.runnable); +// event.runnable.run(); + // Process the event // switch (event.messageType) { // case ONE: { - publish(event.message1); - event.message1 = null; // cleanup +// publish(event.message1); +// event.message1 = null; // cleanup // return; // } // case TWO: { @@ -63,75 +58,75 @@ public class DispatchProcessor implements EventHandler { // } // } - } +// } } - private void publish(Object message) { - Class messageClass = message.getClass(); - - SubscriptionManager manager = this.subscriptionManager; - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - - try { - boolean empty = subscriptions.isEmpty(); - if (empty) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - - message = new DeadMessage(message); - - empty = subscriptions.isEmpty(); - } - - if (!empty) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.invoke_RingBuffer; +// private void publish(Object message) { +// Class messageClass = message.getClass(); // -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// eventJob.subscriptions = subscriptions; -// } catch (Throwable e) { -// this.publisher.handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - - - -// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are -// ObjectPoolHolder messageHolder = this.pool.take(); -// MessageHolder value = messageHolder.getValue(); - MessageHolder messageHolder = new MessageHolder(); - messageHolder.subscriptions= subscriptions; - messageHolder.messageType = MessageType.ONE; - messageHolder.message1 = message; - -// this.queue.put(messageHolder); - -// int counter = 200; -// while (!this.queue.offer(messageHolder)) { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); -// } -// } - } - } catch (Throwable e) { - this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) - .setPublishedObject(message)); - } - } +// SubscriptionManager manager = this.subscriptionManager; +// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); +// +// try { +// boolean empty = subscriptions.isEmpty(); +// if (empty) { +// // Dead Event +// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); +// +// message = new DeadMessage(message); +// +// empty = subscriptions.isEmpty(); +// } +// +// if (!empty) { +//// // put this on the disruptor ring buffer +//// final RingBuffer ringBuffer = this.invoke_RingBuffer; +//// +//// // setup the job +//// final long seq = ringBuffer.next(); +//// try { +//// MessageHolder eventJob = ringBuffer.get(seq); +//// eventJob.messageType = MessageType.ONE; +//// eventJob.message1 = message; +//// eventJob.subscriptions = subscriptions; +//// } catch (Throwable e) { +//// this.publisher.handlePublicationError(new PublicationError() +//// .setMessage("Error while adding an asynchronous message") +//// .setCause(e) +//// .setPublishedObject(message)); +//// } finally { +//// // always publish the job +//// ringBuffer.publish(seq); +//// } +// +// +// +//// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are +//// ObjectPoolHolder messageHolder = this.pool.take(); +//// MessageHolder value = messageHolder.getValue(); +// MessageHolder messageHolder = new MessageHolder(); +// messageHolder.subscriptions= subscriptions; +// messageHolder.messageType = MessageType.ONE; +// messageHolder.message1 = message; +// +//// this.queue.put(messageHolder); +// +//// int counter = 200; +//// while (!this.queue.offer(messageHolder)) { +//// if (counter > 100) { +//// --counter; +//// } else if (counter > 0) { +//// --counter; +//// Thread.yield(); +//// } else { +//// LockSupport.parkNanos(1L); +//// } +//// } +// } +// } catch (Throwable e) { +// this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) +// .setPublishedObject(message)); +// } +// } }