Cleaned/removed disruptor

This commit is contained in:
nathan 2015-02-13 17:59:13 +01:00
parent 9b0e0c2bb6
commit d1338ad46c
18 changed files with 175 additions and 666 deletions

View File

@ -0,0 +1,83 @@
package net.engio.mbassy.multi;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class DispatchRunnable implements Runnable {
private ErrorHandlingSupport errorHandler;
private TransferQueue<Object> dispatchQueue;
private TransferQueue<Runnable> invokeQueue;
private SubscriptionManager manager;
public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager,
TransferQueue<Object> dispatchQueue, TransferQueue<Runnable> invokeQueue) {
this.errorHandler = errorHandler;
this.manager = subscriptionManager;
this.dispatchQueue = dispatchQueue;
this.invokeQueue = invokeQueue;
}
@Override
public void run() {
final SubscriptionManager manager = this.manager;
final ErrorHandlingSupport errorHandler = this.errorHandler;
final TransferQueue<Object> IN_queue = this.dispatchQueue;
final TransferQueue<Runnable> OUT_queue = this.invokeQueue;
Object message = null;
int counter;
while (true) {
try {
counter = MultiMBassador.WORK_RUN_BLITZ;
while ((message = IN_queue.poll()) == null) {
if (counter > MultiMBassador.WORK_RUN_BLITZ_DIV2) {
--counter;
Thread.yield();
} else if (counter > 0) {
--counter;
LockSupport.parkNanos(1L);
} else {
message = IN_queue.take();
break;
}
}
Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message);
message = deadMessage;
empty = subscriptions.isEmpty();
}
if (!empty) {
Runnable e = new InvokeRunnable(errorHandler, subscriptions, message);
OUT_queue.transfer(e);
}
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
}
}
}
}

View File

@ -0,0 +1,54 @@
package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.Collection;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.subscription.Subscription;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public class InvokeRunnable implements Runnable {
private ErrorHandlingSupport errorHandler;
private Collection<Subscription> subscriptions;
private Object message;
public InvokeRunnable(ErrorHandlingSupport errorHandler, Collection<Subscription> subscriptions, Object message) {
this.errorHandler = errorHandler;
this.subscriptions = subscriptions;
this.message = message;
}
@Override
public void run() {
ErrorHandlingSupport errorHandler = this.errorHandler;
Collection<Subscription> subs = this.subscriptions;
Object message = this.message;
Object[] vararg = null;
for (Subscription sub : subs) {
boolean handled = false;
if (sub.isVarArg()) {
// messageClass will NEVER be an array to begin with, since that will call the multi-arg method
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(errorHandler, vararg);
}
if (!handled) {
sub.publishToSubscription(errorHandler, message);
}
}
}
}

View File

@ -1,20 +1,17 @@
package net.engio.mbassy.multi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DisruptorThreadFactory;
import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.Pow2;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.disruptor.DeadMessage;
import net.engio.mbassy.multi.disruptor.MessageHolder;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import dorkbox.util.objectPool.PoolableObject;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
@ -25,60 +22,28 @@ import dorkbox.util.objectPool.PoolableObject;
*/
public class MultiMBassador implements IMessageBus {
// private static final DisruptorThreadFactory THREAD_FACTORY = new DisruptorThreadFactory("MPMC");
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
// any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down
// private final ExecutorService dispatch_Executor;
// private final ExecutorService invoke_Executor;
// private final Queue<MessageHolder> dispatchQueue;
// private final BlockingQueue<MessageHolder> dispatchQueue;
private final TransferQueue<Object> dispatchQueue;
private final TransferQueue<Runnable> invokeQueue;
// private final SynchronousQueue<MessageHolder> dispatchQueue;
// private Queue<ObjectPoolHolder<MessageHolder>> mpmcArrayQueue;
// all threads that are available for asynchronous message dispatching
private List<Thread> threads;
// private dorkbox.util.objectPool.ObjectPool<MessageHolder> pool;
// must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads.
// private final int dispatch_RingBufferSize = 4;
// private final int invoke_RingBufferSize = 2048;
// private final Disruptor<DispatchHolder> dispatch_Disruptor;
// private final RingBuffer<DispatchHolder> dispatch_RingBuffer;
// private final Disruptor<MessageHolder> invoke_Disruptor;
// private final RingBuffer<MessageHolder> invoke_RingBuffer;
// private final WorkerPool<MessageHolder> invoke_WorkerPool;
public static class HolderPoolable implements PoolableObject<MessageHolder> {
@Override
public MessageHolder create() {
return new MessageHolder();
}
}
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()*2);
this(Runtime.getRuntime().availableProcessors());
}
// private long counter = 0L;
public static final int WORK_RUN_BLITZ = 50;
public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2;
public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) {
@ -96,118 +61,18 @@ public class MultiMBassador implements IMessageBus {
// this.dispatchQueue = new SynchronousQueue<MessageHolder>();
// this.dispatchQueue = new LinkedBlockingQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads));
// this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES,
// new SynchronousQueue<Runnable>(),
// new DisruptorThreadFactory("MB_Dispatch"));
// this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES,
// new SynchronousQueue<Runnable>(),
// new DisruptorThreadFactory("MB_Invoke"));
this.subscriptionManager = new SubscriptionManager();
// this.mpmcArrayQueue = new MpmcArrayQueue<ObjectPoolHolder<MessageHolder>>(numberOfThreads*2);
// this.pool = ObjectPoolFactory.create(new HolderPoolable(), numberOfThreads*2);
int dispatchSize = 2;
// int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads);
int invokeSize = numberOfThreads*2-dispatchSize;
int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads*2);
this.threads = new ArrayList<Thread>(dispatchSize + invokeSize);
DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch");
for (int i = 0; i < dispatchSize; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public void run() {
final MultiMBassador mbassador = MultiMBassador.this;
SubscriptionManager manager = mbassador.subscriptionManager;
final TransferQueue<Object> IN_queue = mbassador.dispatchQueue;
final TransferQueue<Runnable> OUT_queue = mbassador.invokeQueue;
Object message = null;
while (true) {
try {
message = IN_queue.take();
Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message);
message = deadMessage;
empty = subscriptions.isEmpty();
}
if (!empty) {
final Collection<Subscription> finalSubs = subscriptions;
final Object finalMessage = message;
Runnable e = new Runnable() {
@Override
public void run() {
// MultiMBassador mbassador = MultiMBassador.this;
Collection<Subscription> subs = finalSubs;
for (Subscription sub : subs) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
//
// Object[] newInstance = new Object[1];
// newInstance[0] = vararg;
// vararg = newInstance;
// }
// handled = true;
// sub.publishToSubscription(mbassador, vararg);
// }
//
// if (!handled) {
sub.publishToSubscription(mbassador, finalMessage);
// }
}
}
};
// counter = 5;
// while (!OUT_queue.offer(e)) {
// if (counter > 3) {
// --counter;
// Thread.yield();
// } else if (counter > 0) {
// --counter;
// Thread.yield();
//// LockSupport.parkNanos(1L);
// } else {
OUT_queue.transfer(e);
// break;
// }
// }
// OUT_queue.transfer(e);
// OUT_queue.put(e);
}
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
}
}
}
};
Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue, this.invokeQueue);
Thread runner = dispatchThreadFactory.newThread(runnable);
this.threads.add(runner);
@ -219,17 +84,36 @@ public class MultiMBassador implements IMessageBus {
for (int i = 0; i < invokeSize; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings("null")
@Override
public void run() {
final MultiMBassador mbassador = MultiMBassador.this;
final TransferQueue<Runnable> IN_queue = mbassador.invokeQueue;
try {
Runnable runnable = null;
int counter;
while (true) {
IN_queue.take().run();
runnable = null;
counter = WORK_RUN_BLITZ;
while ((runnable = IN_queue.poll()) == null) {
if (counter > WORK_RUN_BLITZ_DIV2) {
--counter;
Thread.yield();
} else if (counter > 0) {
--counter;
LockSupport.parkNanos(1L);
} else {
runnable = IN_queue.take();
break;
}
}
runnable.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
@ -239,142 +123,8 @@ public class MultiMBassador implements IMessageBus {
this.threads.add(runner);
runner.start();
}
////////////////////////////
// PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this);
//
// this.dispatch_Disruptor = new Disruptor<DispatchHolder>(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor,
// ProducerType.MULTI, new SleepingWaitStrategy());
// this.invoke_Disruptor = new Disruptor<MessageHolder>(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor,
// ProducerType.MULTI, new SleepingWaitStrategy());
//
//
// this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer();
// this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer();
//
// // not too many handlers, so we don't contend the locks in the subscription manager
// EventHandler<DispatchHolder> dispatchHandlers[] = new DispatchProcessor[4];
// for (int i = 0; i < dispatchHandlers.length; i++) {
// dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue);
// }
// WorkHandler<MessageHolder> invokeHandlers[] = new InvokeProcessor[numberOfThreads];
// for (int i = 0; i < invokeHandlers.length; i++) {
// invokeHandlers[i] = new InvokeProcessor(this);
// }
//
// this.dispatch_Disruptor.handleEventsWith(dispatchHandlers);
// this.invoke_WorkerPool = new WorkerPool<MessageHolder>(this.invoke_RingBuffer,
// this.invoke_RingBuffer.newBarrier(),
// loggingExceptionHandler,
// invokeHandlers);
//
// this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences());
/////////////////////////////////
//
// this.runners = new ArrayList<InterruptRunnable>(numberOfThreads);
// for (int i = 0; i < numberOfThreads; i++) {
// // each thread will run forever and process incoming message publication requests
// InterruptRunnable runnable = new InterruptRunnable() {
// @Override
// public void run() {
// final int DEFAULT_RETRIES = 200;
// final MultiMBassador mbassador = MultiMBassador.this;
// final Queue<ObjectPoolHolder<MessageHolder>> queue = mbassador.mpmcArrayQueue;
// final ObjectPool<MessageHolder> pool2 = mbassador.pool;
//
// int counter = DEFAULT_RETRIES;
// ObjectPoolHolder<MessageHolder> holder = null;
// MessageHolder value;
//
// while (this.running) {
// holder = queue.poll();
// if (holder != null) {
// // sends off to an executor
// value = holder.getValue();
// Collection<Subscription> subscriptions = value.subscriptionsHolder;
// Object message = value.message1;
// Class<? extends Object> messageClass = message.getClass();
//
// if (messageClass.equals(DeadMessage.class)) {
// for (Subscription sub : subscriptions) {
// sub.publishToSubscription(mbassador, message);
// }
// } else {
// Object[] vararg = null;
//
// for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
//
// Object[] newInstance = new Object[1];
// newInstance[0] = vararg;
// vararg = newInstance;
// }
//
// handled = true;
// sub.publishToSubscription(mbassador, vararg);
// }
//
// if (!handled) {
// sub.publishToSubscription(mbassador, message);
// }
// }
// }
//
// pool2.release(holder);
// counter = DEFAULT_RETRIES;
// } else {
// if (counter > 100) {
// --counter;
// } else if (counter > 0) {
// --counter;
// Thread.yield();
// } else {
// LockSupport.parkNanos(1L);
// counter = DEFAULT_RETRIES;
// }
// }
//
//// handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",holder));
// }
// }
// };
// Thread runner = THREAD_FACTORY.newThread(runnable);
// this.runners.add(runnable);
// runner.start();
// }
}
public final MultiMBassador start() {
// this.invoke_WorkerPool.start(this.invoke_Executor);
// this.invoke_Disruptor.start();
// this.dispatch_Disruptor.start();
return this;
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this.errorHandlers) {

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.disruptor;
package net.engio.mbassy.multi.common;
/**
* The dead message event is published whenever no message

View File

@ -1,18 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class DispatchFactory implements EventFactory<DispatchHolder> {
public DispatchFactory() {
}
@Override
public DispatchHolder newInstance() {
return new DispatchHolder();
}
}

View File

@ -1,18 +0,0 @@
package net.engio.mbassy.multi.disruptor;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class DispatchHolder {
public MessageType messageType = MessageType.ONE;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public DispatchHolder() {
}
}

View File

@ -1,137 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import java.util.Collection;
import java.util.Queue;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import com.lmax.disruptor.EventHandler;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public class DispatchProcessor implements EventHandler<DispatchHolder> {
private final MultiMBassador publisher;
private final long ordinal;
private final long numberOfConsumers;
private final SubscriptionManager subscriptionManager;
// private final RingBuffer<MessageHolder> invoke_RingBuffer;
private final Queue<MessageHolder> queue;
public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers,
final SubscriptionManager subscriptionManager, Queue<MessageHolder> queue) {
this.publisher = publisher;
this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers;
this.subscriptionManager = subscriptionManager;
this.queue = queue;
}
@Override
public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % this.numberOfConsumers == this.ordinal) {
// Process the event
// switch (event.messageType) {
// case ONE: {
publish(event.message1);
event.message1 = null; // cleanup
// return;
// }
// case TWO: {
// // publisher.publish(this.message1, this.message2);
// event.message1 = null; // cleanup
// event.message2 = null; // cleanup
// return;
// }
// case THREE: {
// // publisher.publish(this.message1, this.message2, this.message3);
// event.message1 = null; // cleanup
// event.message2 = null; // cleanup
// event.message3 = null; // cleanup
// return;
// }
// case ARRAY: {
// // publisher.publish(this.messages);
// event.messages = null; // cleanup
// return;
// }
// }
}
}
private void publish(Object message) {
Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
try {
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
message = new DeadMessage(message);
empty = subscriptions.isEmpty();
}
if (!empty) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.invoke_RingBuffer;
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.ONE;
// eventJob.message1 = message;
// eventJob.subscriptions = subscriptions;
// } catch (Throwable e) {
// this.publisher.handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are
// ObjectPoolHolder<MessageHolder> messageHolder = this.pool.take();
// MessageHolder value = messageHolder.getValue();
MessageHolder messageHolder = new MessageHolder();
messageHolder.subscriptions= subscriptions;
messageHolder.messageType = MessageType.ONE;
messageHolder.message1 = message;
// this.queue.put(messageHolder);
// int counter = 200;
// while (!this.queue.offer(messageHolder)) {
// if (counter > 100) {
// --counter;
// } else if (counter > 0) {
// --counter;
// Thread.yield();
// } else {
// LockSupport.parkNanos(1L);
// }
// }
}
} catch (Throwable e) {
this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e)
.setPublishedObject(message));
}
}
}

View File

@ -1,18 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class InvokeFactory implements EventFactory<MessageHolder> {
public InvokeFactory() {
}
@Override
public MessageHolder newInstance() {
return new MessageHolder();
}
}

View File

@ -1,85 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import java.util.Collection;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.subscription.Subscription;
import com.lmax.disruptor.WorkHandler;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class InvokeProcessor implements WorkHandler<MessageHolder> {
private final MultiMBassador publisher;
public InvokeProcessor(final MultiMBassador publisher) {
this.publisher = publisher;
}
@Override
public void onEvent(MessageHolder event) throws Exception {
// switch (event.messageType) {
// case ONE: {
publish(event.subscriptions, event.message1);
event.message1 = null; // cleanup
event.subscriptions = null;
// return;
// }
// case TWO: {
//// publisher.publish(this.message1, this.message2);
// event.message1 = null; // cleanup
// event.message2 = null; // cleanup
// return;
// }
// case THREE: {
//// publisher.publish(this.message1, this.message2, this.message3);
// event.message1 = null; // cleanup
// event.message2 = null; // cleanup
// event.message3 = null; // cleanup
// return;
// }
// case ARRAY: {
//// publisher.publish(this.messages);
// event.messages = null; // cleanup
// return;
// }
// }
}
private void publish(Collection<Subscription> subscriptions, Object message) {
Class<? extends Object> messageClass = message.getClass();
if (messageClass.equals(DeadMessage.class)) {
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this.publisher, message);
}
} else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
//
// Object[] newInstance = new Object[1];
// newInstance[0] = vararg;
// vararg = newInstance;
// }
//
// handled = true;
// sub.publishToSubscription(this.publisher, vararg);
// }
// if (!handled) {
sub.publishToSubscription(this.publisher, message);
// }
}
}
}
}

View File

@ -1,53 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import java.util.Collection;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.subscription.Subscription;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MessageHolder {
public MessageType messageType = MessageType.ONE;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public Collection<Subscription> subscriptions;
public MessageHolder() {
}
public void publish(MultiMBassador publisher) {
//
// switch (this.messageType) {
// case ONE: {
// publisher.publishExecutor(this.message1);
// this.message1 = null; // cleanup
// return;
// }
// case TWO: {
// publisher.publish(this.message1, this.message2);
// this.message1 = null; // cleanup
// this.message2 = null; // cleanup
// return;
// }
// case THREE: {
// publisher.publish(this.message1, this.message2, this.message3);
// this.message1 = null; // cleanup
// this.message2 = null; // cleanup
// this.message3 = null; // cleanup
// return;
// }
// case ARRAY: {
// publisher.publish(this.messages);
// this.messages = null; // cleanup
// return;
// }
// }
}
}

View File

@ -1,9 +0,0 @@
package net.engio.mbassy.multi.disruptor;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public enum MessageType {
ONE, TWO, THREE, ARRAY
}

View File

@ -1,35 +0,0 @@
package net.engio.mbassy.multi.disruptor;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import com.lmax.disruptor.ExceptionHandler;
public final class PublicationExceptionHandler implements ExceptionHandler {
private final ErrorHandlingSupport errorHandler;
public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void handleEventException(final Throwable e, final long sequence, final Object event) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")")
.setCause(e));
}
@Override
public void handleOnStartException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error starting the disruptor")
.setCause(e));
}
@Override
public void handleOnShutdownException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error stopping the disruptor")
.setCause(e));
}
}

View File

@ -3,8 +3,6 @@ package net.engio.mbassy.multi;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest;
@ -20,7 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test
public void testSingleThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MultiMBassador().start();
IMessageBus fifoBUs = new MultiMBassador();
List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){
@ -55,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test
public void testSingleThreadedSyncAsyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MultiMBassador(1).start();
IMessageBus fifoBUs = new MultiMBassador(1);
List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){

View File

@ -5,10 +5,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
import net.engio.mbassy.multi.common.TestUtil;
import net.engio.mbassy.multi.disruptor.DeadMessage;
import net.engio.mbassy.multi.listeners.IMessageListener;
import net.engio.mbassy.multi.listeners.MessagesListener;
import net.engio.mbassy.multi.listeners.ObjectListener;

View File

@ -2,7 +2,6 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
@ -103,7 +102,7 @@ public class MBassadorTest extends MessageBusTest {
}
};
final MultiMBassador bus = new MultiMBassador().start();
final MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -20,7 +20,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSending(){
IMessageBus bus = new MultiMBassador().start();
IMessageBus bus = new MultiMBassador();
Listener listener1 = new Listener();
bus.subscribe(listener1);

View File

@ -2,8 +2,6 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
@ -31,7 +29,7 @@ public class SyncBusTest extends MessageBusTest {
@Test
public void testSynchronousMessagePublication() throws Exception {
final IMessageBus bus = new MultiMBassador().start();
final IMessageBus bus = new MultiMBassador();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -84,7 +82,7 @@ public class SyncBusTest extends MessageBusTest {
}
};
final IMessageBus bus = new MultiMBassador().start();
final IMessageBus bus = new MultiMBassador();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -42,13 +42,13 @@ public abstract class MessageBusTest extends AssertSupport {
public MultiMBassador createBus() {
MultiMBassador bus = new MultiMBassador().start();
MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(TestFailingHandler);
return bus;
}
public MultiMBassador createBus(ListenerFactory listeners) {
MultiMBassador bus = new MultiMBassador().start();
MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
return bus;