wip, wrapping up

This commit is contained in:
nathan 2015-02-13 17:19:10 +01:00
parent 139f4c3069
commit 9b0e0c2bb6
10 changed files with 2029 additions and 560 deletions

View File

@ -3,32 +3,17 @@ package net.engio.mbassy.multi;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.engio.mbassy.multi.common.BoundedTransferQueue;
import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.DisruptorThreadFactory;
import net.engio.mbassy.multi.common.LinkedTransferQueue; import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.disruptor.DeadMessage; import net.engio.mbassy.multi.disruptor.DeadMessage;
import net.engio.mbassy.multi.disruptor.DispatchFactory;
import net.engio.mbassy.multi.disruptor.DispatchHolder;
import net.engio.mbassy.multi.disruptor.DispatchProcessor;
import net.engio.mbassy.multi.disruptor.MessageHolder; import net.engio.mbassy.multi.disruptor.MessageHolder;
import net.engio.mbassy.multi.disruptor.PublicationExceptionHandler;
import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager; import net.engio.mbassy.multi.subscription.SubscriptionManager;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import dorkbox.util.objectPool.PoolableObject; import dorkbox.util.objectPool.PoolableObject;
/** /**
@ -50,31 +35,30 @@ public class MultiMBassador implements IMessageBus {
// any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down
private final ExecutorService dispatch_Executor; // private final ExecutorService dispatch_Executor;
private final ExecutorService invoke_Executor; // private final ExecutorService invoke_Executor;
// private final TransferQueue<Object> dispatchQueue;
// private final Queue<MessageHolder> dispatchQueue; // private final Queue<MessageHolder> dispatchQueue;
// private final BlockingQueue<MessageHolder> dispatchQueue; // private final BlockingQueue<MessageHolder> dispatchQueue;
private final TransferQueue<Object> dispatchQueue;
private final TransferQueue<Runnable> invokeQueue;
// private final SynchronousQueue<MessageHolder> dispatchQueue; // private final SynchronousQueue<MessageHolder> dispatchQueue;
// private Queue<ObjectPoolHolder<MessageHolder>> mpmcArrayQueue; // private Queue<ObjectPoolHolder<MessageHolder>> mpmcArrayQueue;
// all threads that are available for asynchronous message dispatching // all threads that are available for asynchronous message dispatching
// private List<InterruptRunnable> invokeRunners; private List<Thread> threads;
// private List<InterruptRunnable> dispatchRunners;
// private dorkbox.util.objectPool.ObjectPool<MessageHolder> pool; // private dorkbox.util.objectPool.ObjectPool<MessageHolder> pool;
// must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads. // must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads.
private final int dispatch_RingBufferSize = 2; // private final int dispatch_RingBufferSize = 4;
// private final int invoke_RingBufferSize = 2048; // private final int invoke_RingBufferSize = 2048;
private final Disruptor<DispatchHolder> dispatch_Disruptor; // private final Disruptor<DispatchHolder> dispatch_Disruptor;
private final RingBuffer<DispatchHolder> dispatch_RingBuffer; // private final RingBuffer<DispatchHolder> dispatch_RingBuffer;
private final WorkerPool<DispatchHolder> dispatch_WorkerPool;
// private final Disruptor<MessageHolder> invoke_Disruptor; // private final Disruptor<MessageHolder> invoke_Disruptor;
// private final RingBuffer<MessageHolder> invoke_RingBuffer; // private final RingBuffer<MessageHolder> invoke_RingBuffer;
@ -90,16 +74,11 @@ public class MultiMBassador implements IMessageBus {
} }
public MultiMBassador() { public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()); this(Runtime.getRuntime().availableProcessors()*2);
} }
private final ThreadLocal<DeadMessage> deadMessageCache = new ThreadLocal<DeadMessage>() {
@Override // private long counter = 0L;
protected DeadMessage initialValue()
{
return new DeadMessage(null);
}
};
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) { if (numberOfThreads < 1) {
@ -107,7 +86,9 @@ public class MultiMBassador implements IMessageBus {
} }
// this.objectQueue = new LinkedTransferQueue<MessageHolder>(); // this.objectQueue = new LinkedTransferQueue<MessageHolder>();
// this.dispatchQueue = new LinkedTransferQueue<Object>(); this.dispatchQueue = new LinkedTransferQueue<Object>();
this.invokeQueue = new LinkedTransferQueue<Runnable>();
// this.invokeQueue = new BoundedTransferQueue<Runnable>(numberOfThreads);
// this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads); // this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads);
// this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2));
// this.dispatchQueue = new PTLQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new PTLQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2));
@ -116,14 +97,13 @@ public class MultiMBassador implements IMessageBus {
// this.dispatchQueue = new LinkedBlockingQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads)); // this.dispatchQueue = new LinkedBlockingQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads));
int dispatchSize = 2; // this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES,
this.dispatch_Executor = new ThreadPoolExecutor(dispatchSize, dispatchSize, 1L, TimeUnit.MINUTES, // new SynchronousQueue<Runnable>(),
new LinkedTransferQueue<Runnable>(), // new DisruptorThreadFactory("MB_Dispatch"));
new DisruptorThreadFactory("MB_Dispatch"));
this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, // this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES,
new BoundedTransferQueue<Runnable>(numberOfThreads), // new SynchronousQueue<Runnable>(),
new DisruptorThreadFactory("MB_Invoke")); // new DisruptorThreadFactory("MB_Invoke"));
@ -133,201 +113,132 @@ public class MultiMBassador implements IMessageBus {
// int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); int dispatchSize = 2;
// this.dispatchRunners = new ArrayList<InterruptRunnable>(dispatchSize); // int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads);
// DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); int invokeSize = numberOfThreads*2-dispatchSize;
// for (int i = 0; i < dispatchSize; i++) { this.threads = new ArrayList<Thread>(dispatchSize + invokeSize);
// // each thread will run forever and process incoming message publication requests
// InterruptRunnable runnable = new InterruptRunnable() {
// private final ThreadLocal<DeadMessage> deadMessageCache = new ThreadLocal<DeadMessage>() { DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch");
// @Override for (int i = 0; i < dispatchSize; i++) {
// protected DeadMessage initialValue() // each thread will run forever and process incoming message publication requests
// { Runnable runnable = new Runnable() {
// return new DeadMessage(null); @Override
// } public void run() {
// }; final MultiMBassador mbassador = MultiMBassador.this;
SubscriptionManager manager = mbassador.subscriptionManager;
final TransferQueue<Object> IN_queue = mbassador.dispatchQueue;
final TransferQueue<Runnable> OUT_queue = mbassador.invokeQueue;
Object message = null;
while (true) {
try {
message = IN_queue.take();
Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message);
message = deadMessage;
empty = subscriptions.isEmpty();
}
if (!empty) {
final Collection<Subscription> finalSubs = subscriptions;
final Object finalMessage = message;
Runnable e = new Runnable() {
@Override
public void run() {
// MultiMBassador mbassador = MultiMBassador.this;
Collection<Subscription> subs = finalSubs;
for (Subscription sub : subs) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
// //
// Object[] newInstance = new Object[1];
// newInstance[0] = vararg;
// vararg = newInstance;
// }
// handled = true;
// sub.publishToSubscription(mbassador, vararg);
// }
// //
// @Override // if (!handled) {
// public void run() { sub.publishToSubscription(mbassador, finalMessage);
// final MultiMBassador mbassador = MultiMBassador.this; // }
// SubscriptionManager manager = mbassador.subscriptionManager; }
// final TransferQueue<Object> IN_queue = mbassador.dispatchQueue; }
//// final Queue<MessageHolder> OUT_queue = mbassador.invokeQueue; };
//
// Object message = null;
//// int counter = 200; // counter = 5;
// try { // while (!OUT_queue.offer(e)) {
// while (this.running) { // if (counter > 3) {
// message = IN_queue.take(); // --counter;
//// value = IN_queue.poll(); // Thread.yield();
//// if (value != null) { // } else if (counter > 0) {
// Class<?> messageClass = message.getClass(); // --counter;
// // Thread.yield();
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); //// LockSupport.parkNanos(1L);
// // } else {
// try { OUT_queue.transfer(e);
// boolean empty = subscriptions.isEmpty(); // break;
// if (empty) {
//
// // Dead Event
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
//
// DeadMessage deadMessage = this.deadMessageCache.get();
// deadMessage.relatedMessages[0] = message;
// message = deadMessage;
// empty = subscriptions.isEmpty();
// } // }
//
// if (!empty) {
// for (Subscription sub : subscriptions) {
//// boolean handled = false;
//// if (sub.isVarArg()) {
//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
//// if (vararg == null) {
//// // messy, but the ONLY way to do it.
//// vararg = (Object[]) Array.newInstance(messageClass, 1);
//// vararg[0] = message;
// //
//// Object[] newInstance = new Object[1];
//// newInstance[0] = vararg;
//// vararg = newInstance;
//// }
// //
//// handled = true;
//// sub.publishToSubscription(mbassador, vararg);
//// }
// //
//// if (!handled) {
// sub.publishToSubscription(mbassador, message);
//// }
// }
// }
//// counter = 200;
// } catch (Throwable e) {
// mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
// } // }
//// } else {
//// if (counter > 100) { // OUT_queue.transfer(e);
//// --counter; // OUT_queue.put(e);
//// } else if (counter > 0) { }
//// --counter; } catch (InterruptedException e) {
//// Thread.yield(); return;
//// } else { } catch (Throwable e) {
//// LockSupport.parkNanos(1L); mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
//// } }
//// } }
// } }
// } catch (InterruptedException e) { };
// Thread.currentThread().interrupt();
// return; Thread runner = dispatchThreadFactory.newThread(runnable);
// } this.threads.add(runner);
// } runner.start();
// }; }
//
// Thread runner = dispatchThreadFactory.newThread(runnable);
// this.dispatchRunners.add(runnable);
// runner.start();
// }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
// this.invokeRunners = new ArrayList<InterruptRunnable>(numberOfThreads*2); DisruptorThreadFactory invokeThreadFactory = new DisruptorThreadFactory("MB_Invoke");
// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); for (int i = 0; i < invokeSize; i++) {
// for (int i = 0; i < numberOfThreads; i++) { // each thread will run forever and process incoming message publication requests
// // each thread will run forever and process incoming message publication requests Runnable runnable = new Runnable() {
// InterruptRunnable runnable = new InterruptRunnable() { @Override
// @Override public void run() {
// public void run() { final MultiMBassador mbassador = MultiMBassador.this;
// final TransferQueue<Runnable> IN_queue = mbassador.invokeQueue;
// }
// };
// }
// this.invokeRunners = new ArrayList<InterruptRunnable>(numberOfThreads*2);
// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke");
// for (int i = 0; i < numberOfThreads; i++) {
// // each thread will run forever and process incoming message publication requests
// InterruptRunnable runnable = new InterruptRunnable() {
// @Override
// public void run() {
// final int DEFAULT_RETRIES = 200;
// final MultiMBassador mbassador = MultiMBassador.this;
// final Queue<MessageHolder> queue = mbassador.invokeQueue;
//// final SubscriptionManager manager = mbassador.subscriptionManager;
//// final ObjectPool<MessageHolder> pool2 = mbassador.pool;
//
// int counter = DEFAULT_RETRIES;
//// ObjectPoolHolder<MessageHolder> holder = null;
// MessageHolder value = null;
//
// while (this.running) {
// value = queue.poll();
// if (value != null) {
// // off to be executed
//
// Collection<Subscription> subscriptions = value.subscriptions;
// Object message = value.message1;
//// Class<? extends Object> messageClass = message.getClass();
//
//// if (messageClass.equals(DeadMessage.class)) {
//// for (Subscription sub : subscriptions) {
//// sub.publishToSubscription(mbassador, message);
//// }
//// } else {
//// Object[] vararg = null;
//
// for (Subscription sub : subscriptions) {
//// boolean handled = false;
//// if (sub.isVarArg()) {
//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
//// if (vararg == null) {
//// // messy, but the ONLY way to do it.
//// vararg = (Object[]) Array.newInstance(messageClass, 1);
//// vararg[0] = message;
////
//// Object[] newInstance = new Object[1];
//// newInstance[0] = vararg;
//// vararg = newInstance;
//// }
////
//// handled = true;
//// sub.publishToSubscription(mbassador, vararg);
//// }
////
//// if (!handled) {
// sub.publishToSubscription(mbassador, message);
//// }
// }
//// }
//
//// pool2.release(holder);
//
// counter = DEFAULT_RETRIES;
// } else {
// if (counter > 100) {
// --counter;
// } else if (counter > 0) {
// --counter;
// Thread.yield();
// } else {
// LockSupport.parkNanos(1L);
// }
// }
//
//// handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",holder));
// }
// }
// };
//
// Thread runner = invokerThreadFactory.newThread(runnable);
// this.invokeRunners.add(runnable);
// runner.start();
// }
try {
while (true) {
IN_queue.take().run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
};
Thread runner = invokeThreadFactory.newThread(runnable);
this.threads.add(runner);
runner.start();
}
@ -345,22 +256,22 @@ public class MultiMBassador implements IMessageBus {
//////////////////////////// ////////////////////////////
PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); // PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this);
//
this.dispatch_Disruptor = new Disruptor<DispatchHolder>(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, // this.dispatch_Disruptor = new Disruptor<DispatchHolder>(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor,
ProducerType.MULTI, new SleepingWaitStrategy()); // ProducerType.MULTI, new SleepingWaitStrategy());
// this.invoke_Disruptor = new Disruptor<MessageHolder>(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor, // this.invoke_Disruptor = new Disruptor<MessageHolder>(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor,
// ProducerType.MULTI, new SleepingWaitStrategy()); // ProducerType.MULTI, new SleepingWaitStrategy());
//
//
this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); // this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer();
// this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer(); // this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer();
//
// not too many handlers, so we don't contend the locks in the subscription manager // // not too many handlers, so we don't contend the locks in the subscription manager
WorkHandler<DispatchHolder> dispatchHandlers[] = new DispatchProcessor[dispatchSize]; // EventHandler<DispatchHolder> dispatchHandlers[] = new DispatchProcessor[4];
for (int i = 0; i < dispatchHandlers.length; i++) { // for (int i = 0; i < dispatchHandlers.length; i++) {
dispatchHandlers[i] = new DispatchProcessor(i, dispatchHandlers.length, this.invoke_Executor); // dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue);
} // }
// WorkHandler<MessageHolder> invokeHandlers[] = new InvokeProcessor[numberOfThreads]; // WorkHandler<MessageHolder> invokeHandlers[] = new InvokeProcessor[numberOfThreads];
// for (int i = 0; i < invokeHandlers.length; i++) { // for (int i = 0; i < invokeHandlers.length; i++) {
@ -368,12 +279,12 @@ public class MultiMBassador implements IMessageBus {
// } // }
// //
// this.dispatch_Disruptor.handleEventsWith(dispatchHandlers); // this.dispatch_Disruptor.handleEventsWith(dispatchHandlers);
this.dispatch_WorkerPool = new WorkerPool<DispatchHolder>(this.dispatch_RingBuffer, // this.invoke_WorkerPool = new WorkerPool<MessageHolder>(this.invoke_RingBuffer,
this.dispatch_RingBuffer.newBarrier(), // this.invoke_RingBuffer.newBarrier(),
loggingExceptionHandler, // loggingExceptionHandler,
dispatchHandlers); // invokeHandlers);
// //
this.dispatch_RingBuffer.addGatingSequences(this.dispatch_WorkerPool.getWorkerSequences()); // this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences());
///////////////////////////////// /////////////////////////////////
// //
@ -457,9 +368,9 @@ public class MultiMBassador implements IMessageBus {
} }
public final MultiMBassador start() { public final MultiMBassador start() {
this.dispatch_WorkerPool.start(this.dispatch_Executor); // this.invoke_WorkerPool.start(this.invoke_Executor);
// this.invoke_Disruptor.start(); // this.invoke_Disruptor.start();
this.dispatch_Disruptor.start(); // this.dispatch_Disruptor.start();
return this; return this;
} }
@ -491,24 +402,24 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public boolean hasPendingMessages() { public boolean hasPendingMessages() {
return this.pendingMessages.get() > 0L;
// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; // return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize;
// return !this.dispatchQueue.isEmpty(); return !(this.dispatchQueue.isEmpty() && this.invokeQueue.isEmpty());
} }
@Override @Override
public void shutdown() { public void shutdown() {
// for (InterruptRunnable runnable : this.dispatchRunners) { for (Thread t : this.threads) {
// runnable.stop(); t.interrupt();
// } }
// System.err.println(this.counter);
// for (InterruptRunnable runnable : this.invokeRunners) { // for (InterruptRunnable runnable : this.invokeRunners) {
// runnable.stop(); // runnable.stop();
// } // }
this.dispatch_Disruptor.shutdown(); // this.dispatch_Disruptor.shutdown();
this.dispatch_Executor.shutdown(); // this.dispatch_Executor.shutdown();
this.invoke_Executor.shutdown();
} }
@ -785,112 +696,63 @@ public class MultiMBassador implements IMessageBus {
// } // }
} }
private final AtomicLong pendingMessages = new AtomicLong(0);
@Override @Override
public void publishAsync(final Object message) { public void publishAsync(Object message) {
if (message != null) { if (message != null) {
// put this on the disruptor ring buffer // // put this on the disruptor ring buffer
final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer; // final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
Runnable runnable = new Runnable() {
@Override
public void run() {
// System.err.println("invoke");
Object localMessage = message;
Class<?> messageClass = localMessage.getClass();
SubscriptionManager subscriptionManager = MultiMBassador.this.subscriptionManager;
Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = MultiMBassador.this.deadMessageCache.get();
localMessage = new DeadMessage(message);
empty = subscriptions.isEmpty();
}
if (!empty) {
for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (sub.isVarArg()) {
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(messageClass, 1);
// vararg[0] = message;
// //
// Object[] newInstance = new Object[1]; // // setup the job
// newInstance[0] = vararg; // final long seq = ringBuffer.next();
// vararg = newInstance; // try {
// } // DispatchHolder eventJob = ringBuffer.get(seq);
// // eventJob.messageType = MessageType.ONE;
// handled = true; // eventJob.message1 = message;
// sub.publishToSubscription(mbassador, vararg); // } catch (Throwable e) {
// } // handlePublicationError(new PublicationError()
// // .setMessage("Error while adding an asynchronous message")
// if (!handled) { // .setCause(e)
sub.publishToSubscription(MultiMBassador.this, localMessage); // .setPublishedObject(message));
// } // } finally {
} // // always publish the job
} // ringBuffer.publish(seq);
// }
MultiMBassador.this.pendingMessages.getAndDecrement();
}
};
DispatchHolder eventJob = ringBuffer.get(seq);
eventJob.runnable = runnable;
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
this.pendingMessages.getAndIncrement();
// System.err.println("adding " + this.pendingMessages.getAndIncrement());
// MessageHolder messageHolder = new MessageHolder(); // MessageHolder messageHolder = new MessageHolder();
// messageHolder.messageType = MessageType.ONE; // messageHolder.messageType = MessageType.ONE;
// messageHolder.message1 = message; // messageHolder.message1 = message;
// new Runnable() {
// @Override
// public void run() {
//
// }
// };
// try { // faster if we can skip locking
// this.dispatchQueue.transfer(message); // int counter = 200;
// // while (!this.dispatchQueue.offer(message)) {
//// int counter = 200; // if (counter > 100) {
//// while (!this.dispatchQueue.offer(messageHolder)) { // --counter;
//// if (counter > 100) { // Thread.yield();
//// --counter; // } else if (counter > 0) {
//// } else if (counter > 0) { // --counter;
//// --counter; // LockSupport.parkNanos(1L);
//// Thread.yield(); // } else {
//// } else { try {
//// LockSupport.parkNanos(1L); this.dispatchQueue.transfer(message);
//// } return;
//// } } catch (InterruptedException e) {
// e.printStackTrace();
// // log.error(e);
// } catch (InterruptedException e) {
// e.printStackTrace(); handlePublicationError(new PublicationError()
// // log.error(e); .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// handlePublicationError(new PublicationError() .setPublishedObject(message));
// .setMessage("Error while adding an asynchronous message") }
// .setCause(e) // }
// .setPublishedObject(message));
// } // }
} }
} }

View File

@ -2,7 +2,8 @@ package net.engio.mbassy.multi.common;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
/** /**
* This data structure is optimized for non-blocking reads even when write operations occur. * This data structure is optimized for non-blocking reads even when write operations occur.
@ -16,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> { public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
// Internal state // Internal state
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected Entry<T> head; // reference to the first element protected Entry<T> head; // reference to the first element
@ -66,6 +67,7 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
return this.entries.size(); return this.entries.size();
} }
@Override
public boolean isEmpty() { public boolean isEmpty() {
return this.head == null; return this.head == null;
} }
@ -90,29 +92,38 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
*/ */
@Override @Override
public boolean remove(T element) { public boolean remove(T element) {
if (!contains(element)) {
// return quickly Lock updateLock = this.lock.updateLock();
return false; boolean isNull;
} else { try {
Lock writeLock = this.lock.writeLock(); updateLock.lock();
try { ISetEntry<T> entry = this.entries.get(element);
writeLock.lock();
ISetEntry<T> listelement = this.entries.get(element); isNull = entry == null || entry.getValue() == null;
if (listelement == null) { if (isNull) {
return false; //removed by other thread in the meantime Lock writeLock = this.lock.writeLock();
try {
writeLock.lock();
ISetEntry<T> listelement = this.entries.get(element);
if (listelement == null) {
return false; //removed by other thread in the meantime
} else if (listelement != this.head) {
listelement.remove();
} else {
// if it was second, now it's first
this.head = this.head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
this.entries.remove(element);
return true;
} finally {
writeLock.unlock();
} }
if (listelement != this.head) { } else {
listelement.remove(); return false; // fast exit
} else {
// if it was second, now it's first
this.head = this.head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
this.entries.remove(element);
} finally {
writeLock.unlock();
} }
return true; } finally {
updateLock.unlock();
} }
} }

View File

@ -42,20 +42,10 @@ public final class BoundedTransferQueue<E> extends AbstractQueue<E> implements T
@Override @Override
public boolean offer(E e) { public boolean offer(E e) {
// if(tryDecrementCapacity()) { if(tryDecrementCapacity()) {
// return this._queue.offer(e); return this._queue.offer(e);
// } }
// return false; return false;
try {
if (tryDecrementCapacity()) {
this._queue.put(e);
} else {
this._queue.transfer(e);
this._remainingCapacity.decrementAndGet();
}
} catch (InterruptedException e2) {}
return true;
} }
@Override @Override

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import java.util.HashMap; import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
/** /**
@ -9,11 +9,11 @@ import java.util.Iterator;
* @author bennidi * @author bennidi
* Date: 2/12/12 * Date: 2/12/12
*/ */
public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T>{ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public StrongConcurrentSet() { public StrongConcurrentSet() {
super(new HashMap<T, ISetEntry<T>>()); super(new IdentityHashMap<T, ISetEntry<T>>());
} }
@Override @Override

View File

@ -9,21 +9,24 @@ package net.engio.mbassy.multi.disruptor;
* @author dorkbox, llc * @author dorkbox, llc
* Date: 2/2/15 * Date: 2/2/15
*/ */
final public class DeadMessage { public final class DeadMessage {
public Object[] relatedMessages = new Object[3]; private Object[] relatedMessages;
public DeadMessage(Object message) { public DeadMessage(Object message) {
this.relatedMessages = new Object[1];
this.relatedMessages[0] = message; this.relatedMessages[0] = message;
} }
public DeadMessage(Object message1, Object message2) { public DeadMessage(Object message1, Object message2) {
this.relatedMessages = new Object[2];
this.relatedMessages[0] = message1; this.relatedMessages[0] = message1;
this.relatedMessages[1] = message2; this.relatedMessages[1] = message2;
} }
public DeadMessage(Object message1, Object message2, Object message3 ) { public DeadMessage(Object message1, Object message2, Object message3 ) {
this.relatedMessages = new Object[3];
this.relatedMessages[0] = message1; this.relatedMessages[0] = message1;
this.relatedMessages[1] = message2; this.relatedMessages[1] = message2;
this.relatedMessages[2] = message3; this.relatedMessages[2] = message3;

View File

@ -6,8 +6,12 @@ package net.engio.mbassy.multi.disruptor;
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class DispatchHolder { public class DispatchHolder {
public MessageType messageType = MessageType.ONE;
public Runnable runnable = null; public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public DispatchHolder() { public DispatchHolder() {
} }

View File

@ -1,41 +1,46 @@
package net.engio.mbassy.multi.disruptor; package net.engio.mbassy.multi.disruptor;
import java.util.concurrent.ExecutorService; import java.util.Collection;
import java.util.Queue;
import com.lmax.disruptor.WorkHandler; import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import com.lmax.disruptor.EventHandler;
/** /**
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
*/ */
public class DispatchProcessor implements WorkHandler<DispatchHolder> { public class DispatchProcessor implements EventHandler<DispatchHolder> {
private final MultiMBassador publisher;
private final long ordinal; private final long ordinal;
private final long numberOfConsumers; private final long numberOfConsumers;
private final ExecutorService invoke_Executor; private final SubscriptionManager subscriptionManager;
// private final RingBuffer<MessageHolder> invoke_RingBuffer;
private final Queue<MessageHolder> queue;
public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers,
final SubscriptionManager subscriptionManager, Queue<MessageHolder> queue) {
this.publisher = publisher;
public DispatchProcessor(final long ordinal, final long numberOfConsumers,
final ExecutorService invoke_Executor) {
this.ordinal = ordinal; this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers; this.numberOfConsumers = numberOfConsumers;
this.invoke_Executor = invoke_Executor; this.subscriptionManager = subscriptionManager;
this.queue = queue;
} }
@Override @Override
public void onEvent(DispatchHolder event) throws Exception { public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception {
// if (sequence % this.numberOfConsumers == this.ordinal) { if (sequence % this.numberOfConsumers == this.ordinal) {
// System.err.println("handoff -" + this.ordinal);
this.invoke_Executor.submit(event.runnable);
// event.runnable.run();
// Process the event // Process the event
// switch (event.messageType) { // switch (event.messageType) {
// case ONE: { // case ONE: {
// publish(event.message1); publish(event.message1);
// event.message1 = null; // cleanup event.message1 = null; // cleanup
// return; // return;
// } // }
// case TWO: { // case TWO: {
@ -58,75 +63,75 @@ public class DispatchProcessor implements WorkHandler<DispatchHolder> {
// } // }
// } // }
// } }
} }
// private void publish(Object message) { private void publish(Object message) {
// Class<?> messageClass = message.getClass(); Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
try {
boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
message = new DeadMessage(message);
empty = subscriptions.isEmpty();
}
if (!empty) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.invoke_RingBuffer;
// //
// SubscriptionManager manager = this.subscriptionManager; // // setup the job
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); // final long seq = ringBuffer.next();
// // try {
// try { // MessageHolder eventJob = ringBuffer.get(seq);
// boolean empty = subscriptions.isEmpty(); // eventJob.messageType = MessageType.ONE;
// if (empty) { // eventJob.message1 = message;
// // Dead Event // eventJob.subscriptions = subscriptions;
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); // } catch (Throwable e) {
// // this.publisher.handlePublicationError(new PublicationError()
// message = new DeadMessage(message); // .setMessage("Error while adding an asynchronous message")
// // .setCause(e)
// empty = subscriptions.isEmpty(); // .setPublishedObject(message));
// } // } finally {
// // // always publish the job
// if (!empty) { // ringBuffer.publish(seq);
//// // put this on the disruptor ring buffer // }
//// final RingBuffer<MessageHolder> ringBuffer = this.invoke_RingBuffer;
////
//// // setup the job
//// final long seq = ringBuffer.next(); // // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are
//// try { // ObjectPoolHolder<MessageHolder> messageHolder = this.pool.take();
//// MessageHolder eventJob = ringBuffer.get(seq); // MessageHolder value = messageHolder.getValue();
//// eventJob.messageType = MessageType.ONE; MessageHolder messageHolder = new MessageHolder();
//// eventJob.message1 = message; messageHolder.subscriptions= subscriptions;
//// eventJob.subscriptions = subscriptions; messageHolder.messageType = MessageType.ONE;
//// } catch (Throwable e) { messageHolder.message1 = message;
//// this.publisher.handlePublicationError(new PublicationError()
//// .setMessage("Error while adding an asynchronous message") // this.queue.put(messageHolder);
//// .setCause(e)
//// .setPublishedObject(message)); // int counter = 200;
//// } finally { // while (!this.queue.offer(messageHolder)) {
//// // always publish the job // if (counter > 100) {
//// ringBuffer.publish(seq); // --counter;
//// } // } else if (counter > 0) {
// // --counter;
// // Thread.yield();
// // } else {
//// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are // LockSupport.parkNanos(1L);
//// ObjectPoolHolder<MessageHolder> messageHolder = this.pool.take(); // }
//// MessageHolder value = messageHolder.getValue(); // }
// MessageHolder messageHolder = new MessageHolder(); }
// messageHolder.subscriptions= subscriptions; } catch (Throwable e) {
// messageHolder.messageType = MessageType.ONE; this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e)
// messageHolder.message1 = message; .setPublishedObject(message));
// }
//// this.queue.put(messageHolder); }
//
//// int counter = 200;
//// while (!this.queue.offer(messageHolder)) {
//// if (counter > 100) {
//// --counter;
//// } else if (counter > 0) {
//// --counter;
//// Thread.yield();
//// } else {
//// LockSupport.parkNanos(1L);
//// }
//// }
// }
// } catch (Throwable e) {
// this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e)
// .setPublishedObject(message));
// }
// }
} }

View File

@ -34,11 +34,18 @@ public class Subscription {
private final MessageHandler handlerMetadata; private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation; private final IHandlerInvocation invocation;
// protected final Collection<Object> listeners;
// protected final Map<WeakReference<Object>, Boolean> listeners;
// protected final Map<Object, Boolean> listeners;
protected final IConcurrentSet<Object> listeners; protected final IConcurrentSet<Object> listeners;
Subscription(MessageHandler handler) { Subscription(MessageHandler handler) {
// this.listeners = new WeakConcurrentSet<Object>(); // this.listeners = new WeakConcurrentSet<Object>();
this.listeners = new StrongConcurrentSet<Object>(); this.listeners = new StrongConcurrentSet<Object>();
// this.listeners = new ConcurrentHashMap<Object, Boolean>();
// this.listeners = new CopyOnWriteArrayList<Object>();
// this.listeners = new ConcurrentSkipListSet<Object>();
// this.listeners = new ConcurrentWeakHashMap<WeakReference<Object>, Boolean>();
this.handlerMetadata = handler; this.handlerMetadata = handler;
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
@ -51,23 +58,12 @@ public class Subscription {
/** /**
* Check whether this subscription manages a message handler of the given message listener class * Check whether this subscription manages a message handler of the given message listener class
*
* @param listener
* @return
*/ */
// only in unit test
public boolean belongsTo(Class<?> listener){ public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener); return this.handlerMetadata.isFromListener(listener);
} }
/**
* Check whether this subscriptions manages the given listener instance
* @param listener
* @return
*/
public boolean contains(Object listener){
return this.listeners.contains(listener);
}
/** Check if this subscription permits sending objects as a VarArg (variable argument) */ /** Check if this subscription permits sending objects as a VarArg (variable argument) */
public boolean isVarArg() { public boolean isVarArg() {
return this.handlerMetadata.isVarArg(); return this.handlerMetadata.isVarArg();
@ -105,19 +101,24 @@ public class Subscription {
return this.handlerMetadata.handlesMessage(messageTypes); return this.handlerMetadata.handlesMessage(messageTypes);
} }
public Class<?>[] getHandledMessageTypes(){ public Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages(); return this.handlerMetadata.getHandledMessages();
} }
public void subscribe(Object listener) { public void subscribe(Object listener) {
// this.listeners.put(listener, Boolean.TRUE);
this.listeners.add(listener); this.listeners.add(listener);
} }
/** /**
* @return TRUE if the element was removed * @return TRUE if the element was removed
*/ */
public boolean unsubscribe(Object existingListener) { public boolean unsubscribe(Object existingListener) {
// Boolean remove = this.listeners.remove(existingListener);
// if (remove != null) {
// return true;
// }
// return false;
return this.listeners.remove(existingListener); return this.listeners.remove(existingListener);
} }
@ -125,15 +126,22 @@ public class Subscription {
return this.listeners.isEmpty(); return this.listeners.isEmpty();
} }
// only used in unit-test
public int size() { public int size() {
return this.listeners.size(); return this.listeners.size();
} }
// private AtomicLong counter = new AtomicLong();
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
if (this.listeners.size() > 0) { // Collection<Object> listeners = this.listeners.keySet();
Method handler = this.handlerMetadata.getHandler(); // Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners;
for (Object listener : this.listeners) { if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler();
// int count = 0;
for (Object listener : listeners) {
// count++;
try { try {
this.invocation.invoke(listener, handler, message); this.invocation.invoke(listener, handler, message);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -171,14 +179,19 @@ public class Subscription {
.setPublishedObject(message)); .setPublishedObject(message));
} }
} }
// this.counter.getAndAdd(count);
} }
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
if (this.listeners.size() > 0) { // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners;
if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler(); Method handler = this.handlerMetadata.getHandler();
for (Object listener : this.listeners) { for (Object listener : listeners) {
try { try {
this.invocation.invoke(listener, handler, message1, message2); this.invocation.invoke(listener, handler, message1, message2);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -224,10 +237,14 @@ public class Subscription {
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
if (this.listeners.size() > 0) { // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners;
if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler(); Method handler = this.handlerMetadata.getHandler();
for (Object listener : this.listeners) { for (Object listener : listeners) {
try { try {
this.invocation.invoke(listener, handler, message1, message2, message3); this.invocation.invoke(listener, handler, message1, message2, message3);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -275,10 +292,14 @@ public class Subscription {
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) {
if (this.listeners.size() > 0) { // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners;
if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler(); Method handler = this.handlerMetadata.getHandler();
for (Object listener : this.listeners) { for (Object listener : listeners) {
try { try {
this.invocation.invoke(listener, handler, messages); this.invocation.invoke(listener, handler, messages);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {

View File

@ -13,15 +13,11 @@ import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.IdentityObjectTree;
import net.engio.mbassy.multi.common.ReflectionUtils; import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.SubscriptionPoolable;
import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MetadataReader; import net.engio.mbassy.multi.listener.MetadataReader;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
import dorkbox.util.objectPool.ObjectPool;
import dorkbox.util.objectPool.ObjectPoolFactory;
/** /**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known * It provides fast lookup of existing subscriptions when another instance of an already known
@ -35,13 +31,18 @@ import dorkbox.util.objectPool.ObjectPoolFactory;
*/ */
public class SubscriptionManager { public class SubscriptionManager {
public static class SubHolder {
public int count = 0;
public Collection<Subscription> subs = new ArrayDeque<Subscription>(0);
}
// the metadata reader that is used to inspect objects passed to the subscribe method // the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader = new MetadataReader(); private final MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type // all subscriptions per message type
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time // write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50); private final Map<Class<?>, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, SubHolder>(50);
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>(); private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
@ -64,10 +65,7 @@ public class SubscriptionManager {
// synchronize read/write acces to the subscription maps // synchronize read/write acces to the subscription maps
private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
private ObjectPool<Collection<Subscription>> pool;
public SubscriptionManager() { public SubscriptionManager() {
this.pool = ObjectPoolFactory.create(new SubscriptionPoolable(), 1024);
} }
public void unsubscribe(Object listener) { public void unsubscribe(Object listener) {
@ -97,13 +95,16 @@ public class SubscriptionManager {
Class<?> clazz = handledMessageTypes[0]; Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz); SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz);
if (subs != null) { if (subHolder != null) {
subs.remove(subscription); Collection<Subscription> subs = subHolder.subs;
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) { if (subs.isEmpty()) {
// remove element // remove element
this.subscriptionsPerMessageSingle.remove(clazz); this.subscriptionsPerMessageSingle.remove(clazz);
}
} }
} }
} else { } else {
@ -203,12 +204,14 @@ public class SubscriptionManager {
Class<?> clazz = handledMessageTypes[0]; Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz); SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) { if (subHolder == null) {
subs = new ArrayList<Subscription>(); subHolder = new SubHolder();
this.subscriptionsPerMessageSingle.put(clazz, subs); this.subscriptionsPerMessageSingle.put(clazz, subHolder);
} }
Collection<Subscription> subs = subHolder.subs;
subs.add(subscription); subs.add(subscription);
subHolder.count++;
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
if (subscription.isVarArg()) { if (subscription.isVarArg()) {
@ -288,50 +291,61 @@ public class SubscriptionManager {
} }
} }
private final ThreadLocal<Collection<Subscription>> subscriptionCache = new ThreadLocal<Collection<Subscription>>() {
@Override
protected Collection<Subscription> initialValue()
{
return new ArrayDeque<Subscription>(16); // default
}
};
// obtain the set of subscriptions for the given message type // obtain the set of subscriptions for the given message type
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions = this.subscriptionCache.get(); Collection<Subscription> subscriptions;
subscriptions.clear();
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType); int count = 0;
if (subs != null) { Collection<Subscription> subs;
subscriptions.addAll(subs); SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType);
if (primaryHolder != null) {
subscriptions = new ArrayDeque<Subscription>(count);
subs = primaryHolder.subs;
count = primaryHolder.count;
if (subs != null) {
subscriptions.addAll(subs);
}
} else {
subscriptions = new ArrayDeque<Subscription>(16);
} }
// also add all subscriptions that match super types // also add all subscriptions that match super types
SubHolder subHolder;
ArrayList<Class<?>> types1 = superClassCache(messageType); ArrayList<Class<?>> types1 = superClassCache(messageType);
if (types1 != null) { if (types1 != null) {
Class<?> eventSuperType; Class<?> eventSuperType;
int i; int i;
for (i = 0; i < types1.size(); i++) { for (i = 0; i < types1.size(); i++) {
eventSuperType = types1.get(i); eventSuperType = types1.get(i);
subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subHolder != null) {
for (Subscription sub : subs) { subs = subHolder.subs;
if (sub.handlesMessageType(messageType)) { count += subHolder.count;
subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
} }
} }
} }
addVarArgClass(subscriptions, eventSuperType); count += addVarArgClass(subscriptions, eventSuperType);
} }
} }
addVarArgClass(subscriptions, messageType); count += addVarArgClass(subscriptions, messageType);
if (primaryHolder != null) {
// save off our count, so our collection creation size is optimal.
primaryHolder.count = count;
}
} finally { } finally {
this.LOCK.readLock().unlock(); this.LOCK.readLock().unlock();
} }
@ -487,6 +501,8 @@ public class SubscriptionManager {
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
int count = 16;
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageTypes); Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageTypes);
if (subs != null) { if (subs != null) {
@ -497,7 +513,7 @@ public class SubscriptionManager {
} }
} }
SubHolder subHolder;
int size = messageTypes.length; int size = messageTypes.length;
if (size > 0) { if (size > 0) {
boolean allSameType = true; boolean allSameType = true;
@ -536,10 +552,14 @@ public class SubscriptionManager {
eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subHolder != null) {
for (Subscription sub : subs) { subs = subHolder.subs;
subscriptions.add(sub); count += subHolder.count;
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
@ -547,6 +567,8 @@ public class SubscriptionManager {
} }
} }
} finally { } finally {
this.LOCK.readLock().unlock(); this.LOCK.readLock().unlock();
} }
@ -571,19 +593,27 @@ public class SubscriptionManager {
/////////////// ///////////////
// a var-arg handler might match // a var-arg handler might match
/////////////// ///////////////
private void addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) { private int addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) {
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
SubHolder subHolder;
Collection<Subscription> subs; Collection<Subscription> subs;
int count = 0;
Class<?> varArgClass = this.varArgClasses.get(messageType); Class<?> varArgClass = this.varArgClasses.get(messageType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(varArgClass); subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subs != null) { if (subHolder != null) {
for (Subscription sub : subs) { subs = subHolder.subs;
subscriptions.add(sub); count += subHolder.count;
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
return count;
} }
public Class<?> getVarArg(Class<?> clazz) { public Class<?> getVarArg(Class<?> clazz) {
@ -594,16 +624,23 @@ public class SubscriptionManager {
// a var-arg handler might match // a var-arg handler might match
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
/////////////// ///////////////
private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) { private int addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) {
Collection<Subscription> subs; Collection<Subscription> subs;
SubHolder subHolder;
int count = 0;
Class<?> varArgClass = this.varArgClasses.get(messageType); Class<?> varArgClass = this.varArgClasses.get(messageType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(varArgClass); subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subs != null) { if (subHolder != null) {
for (Subscription sub : subs) { subs = subHolder.subs;
subscriptions.add(sub); count += subHolder.count;
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
@ -612,14 +649,21 @@ public class SubscriptionManager {
varArgClass = this.varArgClasses.get(eventSuperType); varArgClass = this.varArgClasses.get(eventSuperType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(varArgClass); subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subs != null) { if (subHolder != null) {
for (Subscription sub : subs) { subs = subHolder.subs;
subscriptions.add(sub); count += subHolder.count;
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
} }
return count;
} }
private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index, private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index,