From 9b0e0c2bb6f29a6eade5acf5c0a57da8b559ec27 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 13 Feb 2015 17:19:10 +0100 Subject: [PATCH] wip, wrapping up --- .../engio/mbassy/multi/MultiMBassador.java | 566 +++--- .../multi/common/AbstractConcurrentSet.java | 57 +- .../multi/common/BoundedTransferQueue.java | 18 +- .../multi/common/ConcurrentWeakHashMap.java | 1529 +++++++++++++++++ .../multi/common/StrongConcurrentSet.java | 6 +- .../mbassy/multi/disruptor/DeadMessage.java | 7 +- .../multi/disruptor/DispatchHolder.java | 6 +- .../multi/disruptor/DispatchProcessor.java | 175 +- .../multi/subscription/Subscription.java | 67 +- .../subscription/SubscriptionManager.java | 158 +- 10 files changed, 2029 insertions(+), 560 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/multi/common/ConcurrentWeakHashMap.java diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index c9185e6..ca3af16 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -3,32 +3,17 @@ package net.engio.mbassy.multi; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import net.engio.mbassy.multi.common.BoundedTransferQueue; import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; +import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.disruptor.DeadMessage; -import net.engio.mbassy.multi.disruptor.DispatchFactory; -import net.engio.mbassy.multi.disruptor.DispatchHolder; -import net.engio.mbassy.multi.disruptor.DispatchProcessor; import net.engio.mbassy.multi.disruptor.MessageHolder; -import net.engio.mbassy.multi.disruptor.PublicationExceptionHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.SubscriptionManager; - -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.WorkHandler; -import com.lmax.disruptor.WorkerPool; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - import dorkbox.util.objectPool.PoolableObject; /** @@ -50,31 +35,30 @@ public class MultiMBassador implements IMessageBus { // any new thread will be 'NON-DAEMON', so that it will be forced to finish it's task before permitting the JVM to shut down - private final ExecutorService dispatch_Executor; - private final ExecutorService invoke_Executor; +// private final ExecutorService dispatch_Executor; +// private final ExecutorService invoke_Executor; -// private final TransferQueue dispatchQueue; // private final Queue dispatchQueue; // private final BlockingQueue dispatchQueue; + private final TransferQueue dispatchQueue; + private final TransferQueue invokeQueue; // private final SynchronousQueue dispatchQueue; // private Queue> mpmcArrayQueue; // all threads that are available for asynchronous message dispatching -// private List invokeRunners; -// private List dispatchRunners; + private List threads; // private dorkbox.util.objectPool.ObjectPool pool; // must be power of 2. For very high performance the ring buffer, and its contents, should fit in L3 CPU cache for exchanging between threads. - private final int dispatch_RingBufferSize = 2; +// private final int dispatch_RingBufferSize = 4; // private final int invoke_RingBufferSize = 2048; - private final Disruptor dispatch_Disruptor; - private final RingBuffer dispatch_RingBuffer; - private final WorkerPool dispatch_WorkerPool; +// private final Disruptor dispatch_Disruptor; +// private final RingBuffer dispatch_RingBuffer; // private final Disruptor invoke_Disruptor; // private final RingBuffer invoke_RingBuffer; @@ -90,16 +74,11 @@ public class MultiMBassador implements IMessageBus { } public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()); + this(Runtime.getRuntime().availableProcessors()*2); } - private final ThreadLocal deadMessageCache = new ThreadLocal() { - @Override - protected DeadMessage initialValue() - { - return new DeadMessage(null); - } - }; + +// private long counter = 0L; public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { @@ -107,7 +86,9 @@ public class MultiMBassador implements IMessageBus { } // this.objectQueue = new LinkedTransferQueue(); -// this.dispatchQueue = new LinkedTransferQueue(); + this.dispatchQueue = new LinkedTransferQueue(); + this.invokeQueue = new LinkedTransferQueue(); +// this.invokeQueue = new BoundedTransferQueue(numberOfThreads); // this.dispatchQueue = new BoundedTransferQueue(numberOfThreads); // this.dispatchQueue = new MpmcArrayQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new PTLQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); @@ -116,14 +97,13 @@ public class MultiMBassador implements IMessageBus { // this.dispatchQueue = new LinkedBlockingQueue(Pow2.roundToPowerOfTwo(numberOfThreads)); - int dispatchSize = 2; - this.dispatch_Executor = new ThreadPoolExecutor(dispatchSize, dispatchSize, 1L, TimeUnit.MINUTES, - new LinkedTransferQueue(), - new DisruptorThreadFactory("MB_Dispatch")); +// this.dispatch_Executor = new ThreadPoolExecutor(2, 4, 1L, TimeUnit.MINUTES, +// new SynchronousQueue(), +// new DisruptorThreadFactory("MB_Dispatch")); - this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, - new BoundedTransferQueue(numberOfThreads), - new DisruptorThreadFactory("MB_Invoke")); +// this.invoke_Executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads*2, 1L, TimeUnit.MINUTES, +// new SynchronousQueue(), +// new DisruptorThreadFactory("MB_Invoke")); @@ -133,201 +113,132 @@ public class MultiMBassador implements IMessageBus { -// int dispatchSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); -// this.dispatchRunners = new ArrayList(dispatchSize); -// DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); -// for (int i = 0; i < dispatchSize; i++) { -// // each thread will run forever and process incoming message publication requests -// InterruptRunnable runnable = new InterruptRunnable() { -// private final ThreadLocal deadMessageCache = new ThreadLocal() { -// @Override -// protected DeadMessage initialValue() -// { -// return new DeadMessage(null); -// } -// }; + int dispatchSize = 2; +// int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads); + int invokeSize = numberOfThreads*2-dispatchSize; + this.threads = new ArrayList(dispatchSize + invokeSize); + + + DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); + for (int i = 0; i < dispatchSize; i++) { + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @Override + public void run() { + final MultiMBassador mbassador = MultiMBassador.this; + SubscriptionManager manager = mbassador.subscriptionManager; + final TransferQueue IN_queue = mbassador.dispatchQueue; + final TransferQueue OUT_queue = mbassador.invokeQueue; + + Object message = null; + while (true) { + try { + message = IN_queue.take(); + Class messageClass = message.getClass(); + + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + + boolean empty = subscriptions.isEmpty(); + if (empty) { + + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message); + message = deadMessage; + empty = subscriptions.isEmpty(); + } + + if (!empty) { + final Collection finalSubs = subscriptions; + final Object finalMessage = message; + Runnable e = new Runnable() { + @Override + public void run() { +// MultiMBassador mbassador = MultiMBassador.this; + Collection subs = finalSubs; + for (Subscription sub : subs) { +// boolean handled = false; +// if (sub.isVarArg()) { +// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method +// if (vararg == null) { +// // messy, but the ONLY way to do it. +// vararg = (Object[]) Array.newInstance(messageClass, 1); +// vararg[0] = message; // +// Object[] newInstance = new Object[1]; +// newInstance[0] = vararg; +// vararg = newInstance; +// } +// handled = true; +// sub.publishToSubscription(mbassador, vararg); +// } // -// @Override -// public void run() { -// final MultiMBassador mbassador = MultiMBassador.this; -// SubscriptionManager manager = mbassador.subscriptionManager; -// final TransferQueue IN_queue = mbassador.dispatchQueue; -//// final Queue OUT_queue = mbassador.invokeQueue; -// -// Object message = null; -//// int counter = 200; -// try { -// while (this.running) { -// message = IN_queue.take(); -//// value = IN_queue.poll(); -//// if (value != null) { -// Class messageClass = message.getClass(); -// -// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); -// -// try { -// boolean empty = subscriptions.isEmpty(); -// if (empty) { -// -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// -// DeadMessage deadMessage = this.deadMessageCache.get(); -// deadMessage.relatedMessages[0] = message; -// message = deadMessage; -// empty = subscriptions.isEmpty(); +// if (!handled) { + sub.publishToSubscription(mbassador, finalMessage); +// } + } + } + }; + + +// counter = 5; +// while (!OUT_queue.offer(e)) { +// if (counter > 3) { +// --counter; +// Thread.yield(); +// } else if (counter > 0) { +// --counter; +// Thread.yield(); +//// LockSupport.parkNanos(1L); +// } else { + OUT_queue.transfer(e); +// break; // } -// -// if (!empty) { -// for (Subscription sub : subscriptions) { -//// boolean handled = false; -//// if (sub.isVarArg()) { -//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -//// if (vararg == null) { -//// // messy, but the ONLY way to do it. -//// vararg = (Object[]) Array.newInstance(messageClass, 1); -//// vararg[0] = message; -// // -//// Object[] newInstance = new Object[1]; -//// newInstance[0] = vararg; -//// vararg = newInstance; -//// } -// // -//// handled = true; -//// sub.publishToSubscription(mbassador, vararg); -//// } -// // -//// if (!handled) { -// sub.publishToSubscription(mbassador, message); -//// } -// } -// } -//// counter = 200; -// } catch (Throwable e) { -// mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); // } -//// } else { -//// if (counter > 100) { -//// --counter; -//// } else if (counter > 0) { -//// --counter; -//// Thread.yield(); -//// } else { -//// LockSupport.parkNanos(1L); -//// } -//// } -// } -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// return; -// } -// } -// }; -// -// Thread runner = dispatchThreadFactory.newThread(runnable); -// this.dispatchRunners.add(runnable); -// runner.start(); -// } + +// OUT_queue.transfer(e); +// OUT_queue.put(e); + } + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + mbassador.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); + } + } + } + }; + + Thread runner = dispatchThreadFactory.newThread(runnable); + this.threads.add(runner); + runner.start(); + } ////////////////////////////////////////////////////// -// this.invokeRunners = new ArrayList(numberOfThreads*2); -// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); -// for (int i = 0; i < numberOfThreads; i++) { -// // each thread will run forever and process incoming message publication requests -// InterruptRunnable runnable = new InterruptRunnable() { -// @Override -// public void run() { -// -// } -// }; -// } - - - -// this.invokeRunners = new ArrayList(numberOfThreads*2); -// DisruptorThreadFactory invokerThreadFactory = new DisruptorThreadFactory("MB_Invoke"); -// for (int i = 0; i < numberOfThreads; i++) { -// // each thread will run forever and process incoming message publication requests -// InterruptRunnable runnable = new InterruptRunnable() { -// @Override -// public void run() { -// final int DEFAULT_RETRIES = 200; -// final MultiMBassador mbassador = MultiMBassador.this; -// final Queue queue = mbassador.invokeQueue; -//// final SubscriptionManager manager = mbassador.subscriptionManager; -//// final ObjectPool pool2 = mbassador.pool; -// -// int counter = DEFAULT_RETRIES; -//// ObjectPoolHolder holder = null; -// MessageHolder value = null; -// -// while (this.running) { -// value = queue.poll(); -// if (value != null) { -// // off to be executed -// -// Collection subscriptions = value.subscriptions; -// Object message = value.message1; -//// Class messageClass = message.getClass(); -// -//// if (messageClass.equals(DeadMessage.class)) { -//// for (Subscription sub : subscriptions) { -//// sub.publishToSubscription(mbassador, message); -//// } -//// } else { -//// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -//// boolean handled = false; -//// if (sub.isVarArg()) { -//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -//// if (vararg == null) { -//// // messy, but the ONLY way to do it. -//// vararg = (Object[]) Array.newInstance(messageClass, 1); -//// vararg[0] = message; -//// -//// Object[] newInstance = new Object[1]; -//// newInstance[0] = vararg; -//// vararg = newInstance; -//// } -//// -//// handled = true; -//// sub.publishToSubscription(mbassador, vararg); -//// } -//// -//// if (!handled) { -// sub.publishToSubscription(mbassador, message); -//// } -// } -//// } -// -//// pool2.release(holder); -// -// counter = DEFAULT_RETRIES; -// } else { -// if (counter > 100) { -// --counter; -// } else if (counter > 0) { -// --counter; -// Thread.yield(); -// } else { -// LockSupport.parkNanos(1L); -// } -// } -// -//// handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",holder)); -// } -// } -// }; -// -// Thread runner = invokerThreadFactory.newThread(runnable); -// this.invokeRunners.add(runnable); -// runner.start(); -// } - + DisruptorThreadFactory invokeThreadFactory = new DisruptorThreadFactory("MB_Invoke"); + for (int i = 0; i < invokeSize; i++) { + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @Override + public void run() { + final MultiMBassador mbassador = MultiMBassador.this; + final TransferQueue IN_queue = mbassador.invokeQueue; + try { + while (true) { + IN_queue.take().run(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + }; + Thread runner = invokeThreadFactory.newThread(runnable); + this.threads.add(runner); + runner.start(); + } @@ -345,22 +256,22 @@ public class MultiMBassador implements IMessageBus { //////////////////////////// - PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); - - this.dispatch_Disruptor = new Disruptor(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, - ProducerType.MULTI, new SleepingWaitStrategy()); +// PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(this); +// +// this.dispatch_Disruptor = new Disruptor(new DispatchFactory(), this.dispatch_RingBufferSize, this.dispatch_Executor, +// ProducerType.MULTI, new SleepingWaitStrategy()); // this.invoke_Disruptor = new Disruptor(new InvokeFactory(), this.invoke_RingBufferSize, this.invoke_Executor, // ProducerType.MULTI, new SleepingWaitStrategy()); - - - this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); +// +// +// this.dispatch_RingBuffer = this.dispatch_Disruptor.getRingBuffer(); // this.invoke_RingBuffer = this.invoke_Disruptor.getRingBuffer(); - - // not too many handlers, so we don't contend the locks in the subscription manager - WorkHandler dispatchHandlers[] = new DispatchProcessor[dispatchSize]; - for (int i = 0; i < dispatchHandlers.length; i++) { - dispatchHandlers[i] = new DispatchProcessor(i, dispatchHandlers.length, this.invoke_Executor); - } +// +// // not too many handlers, so we don't contend the locks in the subscription manager +// EventHandler dispatchHandlers[] = new DispatchProcessor[4]; +// for (int i = 0; i < dispatchHandlers.length; i++) { +// dispatchHandlers[i] = new DispatchProcessor(this, i, dispatchHandlers.length, this.subscriptionManager, this.invokeQueue); +// } // WorkHandler invokeHandlers[] = new InvokeProcessor[numberOfThreads]; // for (int i = 0; i < invokeHandlers.length; i++) { @@ -368,12 +279,12 @@ public class MultiMBassador implements IMessageBus { // } // // this.dispatch_Disruptor.handleEventsWith(dispatchHandlers); - this.dispatch_WorkerPool = new WorkerPool(this.dispatch_RingBuffer, - this.dispatch_RingBuffer.newBarrier(), - loggingExceptionHandler, - dispatchHandlers); +// this.invoke_WorkerPool = new WorkerPool(this.invoke_RingBuffer, +// this.invoke_RingBuffer.newBarrier(), +// loggingExceptionHandler, +// invokeHandlers); // - this.dispatch_RingBuffer.addGatingSequences(this.dispatch_WorkerPool.getWorkerSequences()); +// this.invoke_RingBuffer.addGatingSequences(this.invoke_WorkerPool.getWorkerSequences()); ///////////////////////////////// // @@ -457,9 +368,9 @@ public class MultiMBassador implements IMessageBus { } public final MultiMBassador start() { - this.dispatch_WorkerPool.start(this.dispatch_Executor); +// this.invoke_WorkerPool.start(this.invoke_Executor); // this.invoke_Disruptor.start(); - this.dispatch_Disruptor.start(); +// this.dispatch_Disruptor.start(); return this; } @@ -491,24 +402,24 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { - return this.pendingMessages.get() > 0L; // return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; -// return !this.dispatchQueue.isEmpty(); + return !(this.dispatchQueue.isEmpty() && this.invokeQueue.isEmpty()); } @Override public void shutdown() { -// for (InterruptRunnable runnable : this.dispatchRunners) { -// runnable.stop(); -// } + for (Thread t : this.threads) { + t.interrupt(); + } + +// System.err.println(this.counter); // for (InterruptRunnable runnable : this.invokeRunners) { // runnable.stop(); // } - this.dispatch_Disruptor.shutdown(); - this.dispatch_Executor.shutdown(); - this.invoke_Executor.shutdown(); +// this.dispatch_Disruptor.shutdown(); +// this.dispatch_Executor.shutdown(); } @@ -785,112 +696,63 @@ public class MultiMBassador implements IMessageBus { // } } - private final AtomicLong pendingMessages = new AtomicLong(0); - @Override - public void publishAsync(final Object message) { + public void publishAsync(Object message) { if (message != null) { - // put this on the disruptor ring buffer - final RingBuffer ringBuffer = this.dispatch_RingBuffer; - - // setup the job - final long seq = ringBuffer.next(); - try { - Runnable runnable = new Runnable() { - @Override - public void run() { -// System.err.println("invoke"); - Object localMessage = message; - - Class messageClass = localMessage.getClass(); - - SubscriptionManager subscriptionManager = MultiMBassador.this.subscriptionManager; - Collection subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass); - - boolean empty = subscriptions.isEmpty(); - if (empty) { - // Dead Event - subscriptions = subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class); - -// DeadMessage deadMessage = MultiMBassador.this.deadMessageCache.get(); - localMessage = new DeadMessage(message); - empty = subscriptions.isEmpty(); - } - - if (!empty) { - for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; +// // put this on the disruptor ring buffer +// final RingBuffer ringBuffer = this.dispatch_RingBuffer; // -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(mbassador, vararg); -// } -// -// if (!handled) { - sub.publishToSubscription(MultiMBassador.this, localMessage); -// } - } - } - - MultiMBassador.this.pendingMessages.getAndDecrement(); - } - }; - - - DispatchHolder eventJob = ringBuffer.get(seq); - eventJob.runnable = runnable; - } catch (Throwable e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } finally { - // always publish the job - ringBuffer.publish(seq); - } - this.pendingMessages.getAndIncrement(); -// System.err.println("adding " + this.pendingMessages.getAndIncrement()); +// // setup the job +// final long seq = ringBuffer.next(); +// try { +// DispatchHolder eventJob = ringBuffer.get(seq); +// eventJob.messageType = MessageType.ONE; +// eventJob.message1 = message; +// } catch (Throwable e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); +// } finally { +// // always publish the job +// ringBuffer.publish(seq); +// } // MessageHolder messageHolder = new MessageHolder(); // messageHolder.messageType = MessageType.ONE; // messageHolder.message1 = message; +// new Runnable() { +// @Override +// public void run() { +// +// } +// }; -// try { -// this.dispatchQueue.transfer(message); -// -//// int counter = 200; -//// while (!this.dispatchQueue.offer(messageHolder)) { -//// if (counter > 100) { -//// --counter; -//// } else if (counter > 0) { -//// --counter; -//// Thread.yield(); -//// } else { -//// LockSupport.parkNanos(1L); -//// } -//// } -// -// -// } catch (InterruptedException e) { -// e.printStackTrace(); -// // log.error(e); -// -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); + // faster if we can skip locking +// int counter = 200; +// while (!this.dispatchQueue.offer(message)) { +// if (counter > 100) { +// --counter; +// Thread.yield(); +// } else if (counter > 0) { +// --counter; +// LockSupport.parkNanos(1L); +// } else { + try { + this.dispatchQueue.transfer(message); + return; + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } +// } // } } } diff --git a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java index f00120f..a21c471 100644 --- a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java @@ -2,7 +2,8 @@ package net.engio.mbassy.multi.common; import java.util.Map; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; /** * This data structure is optimized for non-blocking reads even when write operations occur. @@ -16,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class AbstractConcurrentSet implements IConcurrentSet { // Internal state - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); private final Map> entries; // maintain a map of entries for O(log n) lookup protected Entry head; // reference to the first element @@ -66,6 +67,7 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { return this.entries.size(); } + @Override public boolean isEmpty() { return this.head == null; } @@ -90,29 +92,38 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { */ @Override public boolean remove(T element) { - if (!contains(element)) { - // return quickly - return false; - } else { - Lock writeLock = this.lock.writeLock(); - try { - writeLock.lock(); - ISetEntry listelement = this.entries.get(element); - if (listelement == null) { - return false; //removed by other thread in the meantime + + Lock updateLock = this.lock.updateLock(); + boolean isNull; + try { + updateLock.lock(); + ISetEntry entry = this.entries.get(element); + + isNull = entry == null || entry.getValue() == null; + if (isNull) { + Lock writeLock = this.lock.writeLock(); + try { + writeLock.lock(); + ISetEntry listelement = this.entries.get(element); + if (listelement == null) { + return false; //removed by other thread in the meantime + } else if (listelement != this.head) { + listelement.remove(); + } else { + // if it was second, now it's first + this.head = this.head.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators + } + this.entries.remove(element); + return true; + } finally { + writeLock.unlock(); } - if (listelement != this.head) { - listelement.remove(); - } else { - // if it was second, now it's first - this.head = this.head.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - this.entries.remove(element); - } finally { - writeLock.unlock(); + } else { + return false; // fast exit } - return true; + } finally { + updateLock.unlock(); } } diff --git a/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java b/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java index e1f21c2..af13855 100644 --- a/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java +++ b/src/main/java/net/engio/mbassy/multi/common/BoundedTransferQueue.java @@ -42,20 +42,10 @@ public final class BoundedTransferQueue extends AbstractQueue implements T @Override public boolean offer(E e) { -// if(tryDecrementCapacity()) { -// return this._queue.offer(e); -// } -// return false; - - try { - if (tryDecrementCapacity()) { - this._queue.put(e); - } else { - this._queue.transfer(e); - this._remainingCapacity.decrementAndGet(); - } - } catch (InterruptedException e2) {} - return true; + if(tryDecrementCapacity()) { + return this._queue.offer(e); + } + return false; } @Override diff --git a/src/main/java/net/engio/mbassy/multi/common/ConcurrentWeakHashMap.java b/src/main/java/net/engio/mbassy/multi/common/ConcurrentWeakHashMap.java new file mode 100644 index 0000000..969e6b6 --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/common/ConcurrentWeakHashMap.java @@ -0,0 +1,1529 @@ +package net.engio.mbassy.multi.common; +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/* + * taken from http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/src/jsr166y/ + * and the grizzly project. + */ + +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.AbstractCollection; +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A hash table with weak keys, full concurrency of retrievals, and + * adjustable expected concurrency for updates. Similar to + * {@link java.util.WeakHashMap}, entries of this table are periodically + * removed once their corresponding keys are no longer referenced outside of + * this table. In other words, this table will not prevent a key from being + * discarded by the garbage collector. Once a key has been discarded by the + * collector, the corresponding entry is no longer visible to this table; + * however, the entry may occupy space until a future table operation decides to + * reclaim it. For this reason, summary functions such as size and + * isEmpty might return a value greater than the observed number of + * entries. In order to support a high level of concurrency, stale entries are + * only reclaimed during blocking (usually mutating) operations. + * + * While keys in this table are only held using a weak reference, values are + * held using a normal strong reference. This provides the guarantee that a + * value will always have at least the same life-span as it's key. For this + * reason, care should be taken to ensure that a value never refers, either + * directly or indirectly, to its key, thereby preventing reclamation. If weak + * values are desired, one can simply use a {@link WeakReference} for the value + * type. + * + * Just like {@link java.util.ConcurrentHashMap}, this class obeys the same + * functional specification as {@link java.util.Hashtable}, and includes + * versions of methods corresponding to each method of Hashtable. + * However, even though all operations are thread-safe, retrieval operations do + * not entail locking, and there is not any support for + * locking the entire table in a way that prevents all access. This class is + * fully interoperable with Hashtable in programs that rely on its + * thread safety but not on its synchronization details. + * + *

+ * Retrieval operations (including get) generally do not block, so + * may overlap with update operations (including put and + * remove). Retrievals reflect the results of the most recently + * completed update operations holding upon their onset. For + * aggregate operations such as putAll and clear, + * concurrent retrievals may reflect insertion or removal of only some entries. + * Similarly, Iterators and Enumerations return elements reflecting the state of + * the hash table at some point at or since the creation of the + * iterator/enumeration. They do not throw + * {@link ConcurrentModificationException}. However, iterators are designed to + * be used by only one thread at a time. + * + *

+ * The allowed concurrency among update operations is guided by the optional + * concurrencyLevel constructor argument (default 16), + * which is used as a hint for internal sizing. The table is internally + * partitioned to try to permit the indicated number of concurrent updates + * without contention. Because placement in hash tables is essentially random, + * the actual concurrency will vary. Ideally, you should choose a value to + * accommodate as many threads as will ever concurrently modify the table. Using + * a significantly higher value than you need can waste space and time, and a + * significantly lower value can lead to thread contention. But overestimates + * and underestimates within an order of magnitude do not usually have much + * noticeable impact. A value of one is appropriate when it is known that only + * one thread will modify and all others will only read. Also, resizing this or + * any other kind of hash table is a relatively slow operation, so, when + * possible, it is a good idea to provide estimates of expected table sizes in + * constructors. + * + *

+ * This class and its views and iterators implement all of the optional + * methods of the {@link Map} and {@link Iterator} interfaces. + * + *

+ * Like {@link Hashtable} but unlike {@link HashMap}, this class does + * not allow null to be used as a key or value. + * + *

+ * This class is a member of the + * Java Collections Framework. + * + * @author Doug Lea + * @author Jason T. Greene + * @param the type of keys maintained by this map + * @param the type of mapped values + */ +public class ConcurrentWeakHashMap extends AbstractMap + implements java.util.concurrent.ConcurrentMap, Serializable { + private static final long serialVersionUID = 7249069246763182397L; + + /* + * The basic strategy is to subdivide the table among Segments, + * each of which itself is a concurrently readable hash table. + */ + + /* ---------------- Constants -------------- */ + + /** + * The default initial capacity for this table, + * used when not otherwise specified in a constructor. + */ + static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default load factor for this table, used when not + * otherwise specified in a constructor. + */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * The default concurrency level for this table, used when not + * otherwise specified in a constructor. + */ + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The maximum capacity, used if a higher value is implicitly + * specified by either of the constructors with arguments. MUST + * be a power of two <= 1<<30 to ensure that entries are indexable + * using ints. + */ + static final int MAXIMUM_CAPACITY = 1 << 30; + /** + * The maximum number of segments to allow; used to bound + * constructor arguments. + */ + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + /** + * Number of unsynchronized retries in size and containsValue + * methods before resorting to locking. This is used to avoid + * unbounded retries if tables undergo continuous modification + * which would make it impossible to obtain an accurate result. + */ + static final int RETRIES_BEFORE_LOCK = 2; + /* ---------------- Fields -------------- */ + /** + * Mask value for indexing into segments. The upper bits of a + * key's hash code are used to choose the segment. + */ + final int segmentMask; + /** + * Shift value for indexing within segments. + */ + final int segmentShift; + /** + * The segments, each of which is a specialized hash table + */ + final Segment[] segments; + transient Set keySet; + transient Set> entrySet; + transient Collection values; + /* ---------------- Small Utilities -------------- */ + /** + * Applies a supplemental hash function to a given hashCode, which + * defends against poor quality hash functions. This is critical + * because ConcurrentWeakHashMap uses power-of-two length hash tables, + * that otherwise encounter collisions for hashCodes that do not + * differ in lower or upper bits. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += h << 15 ^ 0xffffcd7d; + h ^= h >>> 10; + h += h << 3; + h ^= h >>> 6; + h += (h << 2) + (h << 14); + return h ^ h >>> 16; + } + /** + * Returns the segment that should be used for key with given hash + * @param hash the hash code for the key + * @return the segment + */ + final Segment segmentFor(int hash) { + return this.segments[hash >>> this.segmentShift & this.segmentMask]; + } + /* ---------------- Inner Classes -------------- */ + /** + * A weak-key reference which stores the key hash needed for reclamation. + */ + static final class WeakKeyReference extends WeakReference { + final int hash; + WeakKeyReference(K key, int hash, ReferenceQueue refQueue) { + super(key, refQueue); + this.hash = hash; + } + } + + /** + * ConcurrentWeakHashMap list entry. Note that this is never exported + * out as a user-visible Map.Entry. + * + * Because the value field is volatile, not final, it is legal wrt + * the Java Memory Model for an unsynchronized reader to see null + * instead of initial value when read via a data race. Although a + * reordering leading to this is not likely to ever actually + * occur, the Segment.readValueUnderLock method is used as a + * backup in case a null (pre-initialized) value is ever seen in + * an unsynchronized access method. + */ + static final class HashEntry { + final WeakReference keyRef; + final int hash; + volatile V value; + final HashEntry next; + HashEntry(K key, int hash, HashEntry next, V value, ReferenceQueue refQueue) { + this.keyRef = new WeakKeyReference(key, hash, refQueue); + this.hash = hash; + this.next = next; + this.value = value; + } + @SuppressWarnings("unchecked") + static final HashEntry[] newArray(int i) { + return new HashEntry[i]; + } + } + /** + * Segments are specialized versions of hash tables. This + * subclasses from ReentrantLock opportunistically, just to + * simplify some locking and avoid separate construction. + */ + static final class Segment extends ReentrantLock implements Serializable { + /* + * Segments maintain a table of entry lists that are ALWAYS + * kept in a consistent state, so can be read without locking. + * Next fields of nodes are immutable (final). All list + * additions are performed at the front of each bin. This + * makes it easy to check changes, and also fast to traverse. + * When nodes would otherwise be changed, new nodes are + * created to replace them. This works well for hash tables + * since the bin lists tend to be short. (The average length + * is less than two for the default load factor threshold.) + * + * Read operations can thus proceed without locking, but rely + * on selected uses of volatiles to ensure that completed + * write operations performed by other threads are + * noticed. For most purposes, the "count" field, tracking the + * number of elements, serves as that volatile variable + * ensuring visibility. This is convenient because this field + * needs to be read in many read operations anyway: + * + * - All (unsynchronized) read operations must first read the + * "count" field, and should not look at table entries if + * it is 0. + * + * - All (synchronized) write operations should write to + * the "count" field after structurally changing any bin. + * The operations must not take any action that could even + * momentarily cause a concurrent read operation to see + * inconsistent data. This is made easier by the nature of + * the read operations in Map. For example, no operation + * can reveal that the table has grown but the threshold + * has not yet been updated, so there are no atomicity + * requirements for this with respect to reads. + * + * As a guide, all critical volatile reads and writes to the + * count field are marked in code comments. + */ + private static final long serialVersionUID = 2249069246763182397L; + /** + * The number of elements in this segment's region. + */ + transient volatile int count; + /** + * Number of updates that alter the size of the table. This is + * used during bulk-read methods to make sure they see a + * consistent snapshot: If modCounts change during a traversal + * of segments computing size or checking containsValue, then + * we might have an inconsistent view of state so (usually) + * must retry. + */ + transient int modCount; + /** + * The table is rehashed when its size exceeds this threshold. + * (The value of this field is always (int)(capacity * + * loadFactor).) + */ + transient int threshold; + + /** + * The per-segment table. + */ + transient volatile HashEntry[] table; + + /** + * The load factor for the hash table. Even though this value + * is same for all segments, it is replicated to avoid needing + * links to outer object. + * @serial + */ + final float loadFactor; + + /** + * The collected weak-key reference queue for this segment. + * This should be (re)initialized whenever table is assigned, + */ + transient volatile ReferenceQueue refQueue; + + Segment(int initialCapacity, float lf) { + this.loadFactor = lf; + setTable(HashEntry.newArray(initialCapacity)); + } + + @SuppressWarnings("unchecked") + static final Segment[] newArray(int i) { + return new Segment[i]; + } + + /** + * Sets table to new HashEntry array. + * Call only while holding lock or in constructor. + */ + void setTable(HashEntry[] newTable) { + this.threshold = (int)(newTable.length * this.loadFactor); + this.table = newTable; + this.refQueue = new ReferenceQueue(); + } + + /** + * Returns properly casted first entry of bin for given hash. + */ + HashEntry getFirst(int hash) { + HashEntry[] tab = this.table; + return tab[hash & tab.length - 1]; + } + + /** + * Reads value field of an entry under lock. Called if value + * field ever appears to be null. This is possible only if a + * compiler happens to reorder a HashEntry initialization with + * its table assignment, which is legal under memory model + * but is not known to ever occur. + */ + V readValueUnderLock(HashEntry e) { + lock(); + try { + removeStale(); + return e.value; + } finally { + unlock(); + } + } + + /* Specialized implementations of map methods */ + + V get(Object key, int hash) { + if (this.count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && key.equals(e.keyRef.get())) { + V v = e.value; + if (v != null) { + return v; + } + return readValueUnderLock(e); // recheck + } + e = e.next; + } + } + return null; + } + + boolean containsKey(Object key, int hash) { + if (this.count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && key.equals(e.keyRef.get())) { + return true; + } + e = e.next; + } + } + return false; + } + + boolean containsValue(Object value) { + if (this.count != 0) { // read-volatile + HashEntry[] tab = this.table; + int len = tab.length; + for (int i = 0 ; i < len; i++) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + V v = e.value; + if (v == null) { + v = readValueUnderLock(e); + } + if (value.equals(v)) { + return true; + } + } + } + } + return false; + } + + boolean replace(K key, int hash, V oldValue, V newValue) { + lock(); + try { + removeStale(); + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !key.equals(e.keyRef.get()))) { + e = e.next; + } + + boolean replaced = false; + if (e != null && oldValue.equals(e.value)) { + replaced = true; + e.value = newValue; + } + return replaced; + } finally { + unlock(); + } + } + + V replace(K key, int hash, V newValue) { + lock(); + try { + removeStale(); + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !key.equals(e.keyRef.get()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + oldValue = e.value; + e.value = newValue; + } + return oldValue; + } finally { + unlock(); + } + } + + + V put(K key, int hash, V value, boolean onlyIfAbsent) { + lock(); + try { + removeStale(); + int c = this.count; + if (c++ > this.threshold) {// ensure capacity + int reduced = rehash(); + if (reduced > 0) { + this.count = (c -= reduced) - 1; // write-volatile + } + } + + HashEntry[] tab = this.table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + while (e != null && (e.hash != hash || !key.equals(e.keyRef.get()))) { + e = e.next; + } + + V oldValue; + if (e != null) { + oldValue = e.value; + if (!onlyIfAbsent) { + e.value = value; + } + } + else { + oldValue = null; + ++this.modCount; + tab[index] = new HashEntry(key, hash, first, value, this.refQueue); + this.count = c; // write-volatile + } + return oldValue; + } finally { + unlock(); + } + } + + int rehash() { + HashEntry[] oldTable = this.table; + int oldCapacity = oldTable.length; + if (oldCapacity >= MAXIMUM_CAPACITY) { + return 0; + } + + /* + * Reclassify nodes in each list to new Map. Because we are + * using power-of-two expansion, the elements from each bin + * must either stay at same index, or move with a power of two + * offset. We eliminate unnecessary node creation by catching + * cases where old nodes can be reused because their next + * fields won't change. Statistically, at the default + * threshold, only about one-sixth of them need cloning when + * a table doubles. The nodes they replace will be garbage + * collectable as soon as they are no longer referenced by any + * reader thread that may be in the midst of traversing table + * right now. + */ + + HashEntry[] newTable = HashEntry.newArray(oldCapacity<<1); + this.threshold = (int)(newTable.length * this.loadFactor); + int sizeMask = newTable.length - 1; + int reduce = 0; + for (int i = 0; i < oldCapacity ; i++) { + // We need to guarantee that any existing reads of old Map can + // proceed. So we cannot yet null out each bin. + HashEntry e = oldTable[i]; + + if (e != null) { + HashEntry next = e.next; + int idx = e.hash & sizeMask; + + // Single node on list + if (next == null) { + newTable[idx] = e; + } else { + // Reuse trailing consecutive sequence at same slot + HashEntry lastRun = e; + int lastIdx = idx; + for (HashEntry last = next; + last != null; + last = last.next) { + int k = last.hash & sizeMask; + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + newTable[lastIdx] = lastRun; + // Clone all remaining nodes + for (HashEntry p = e; p != lastRun; p = p.next) { + // Skip GC'd weak refs + K key = p.keyRef.get(); + if (key == null) { + reduce++; + continue; + } + int k = p.hash & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = new HashEntry(key, p.hash, n, p.value, this.refQueue); + } + } + } + } + this.table = newTable; + return reduce; + } + + /** + * Remove; match on key only if value null, else match both. + */ + V remove(Object key, int hash, Object value, boolean weakRemove) { + lock(); + try { + if (!weakRemove) { + removeStale(); + } + int c = this.count - 1; + HashEntry[] tab = this.table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + // a weak remove operation compares the WeakReference instance + while (e != null && (!weakRemove || key != e.keyRef) + && (e.hash != hash || !key.equals(e.keyRef.get()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + V v = e.value; + if (value == null || value.equals(v)) { + oldValue = v; + // All entries following removed node can stay + // in list, but all preceding ones need to be + // cloned. + ++this.modCount; + HashEntry newFirst = e.next; + for (HashEntry p = first; p != e; p = p.next) { + K pKey = p.keyRef.get(); + if (pKey == null) { // Skip GC'd keys + c--; + continue; + } + + newFirst = new HashEntry(pKey, p.hash, + newFirst, p.value, this.refQueue); + } + tab[index] = newFirst; + this.count = c; // write-volatile + } + } + return oldValue; + } finally { + unlock(); + } + } + + @SuppressWarnings("unchecked") + void removeStale() { + WeakKeyReference ref; + while ((ref = (WeakKeyReference) this.refQueue.poll()) != null) { + remove(ref, ref.hash, null, true); + } + } + + void clear() { + if (this.count != 0) { + lock(); + try { + HashEntry[] tab = this.table; + for (int i = 0; i < tab.length ; i++) { + tab[i] = null; + } + ++this.modCount; + // replace the reference queue to avoid unnecessary stale cleanups + this.refQueue = new ReferenceQueue(); + this.count = 0; // write-volatile + } finally { + unlock(); + } + } + } + } + + + + /* ---------------- Public operations -------------- */ + + /** + * Creates a new, empty map with the specified initial + * capacity, load factor and concurrency level. + * + * @param initialCapacity the initial capacity. The implementation + * performs internal sizing to accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of elements per + * bin exceeds this threshold. + * @param concurrencyLevel the estimated number of concurrently + * updating threads. The implementation performs internal sizing + * to try to accommodate this many threads. + * @throws IllegalArgumentException if the initial capacity is + * negative or the load factor or concurrencyLevel are + * nonpositive. + */ + public ConcurrentWeakHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { + if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) { + throw new IllegalArgumentException(); + } + + if (concurrencyLevel > MAX_SEGMENTS) { + concurrencyLevel = MAX_SEGMENTS; + } + + // Find power-of-two sizes best matching arguments + int sshift = 0; + int ssize = 1; + while (ssize < concurrencyLevel) { + ++sshift; + ssize <<= 1; + } + this.segmentShift = 32 - sshift; + this.segmentMask = ssize - 1; + this.segments = Segment.newArray(ssize); + + if (initialCapacity > MAXIMUM_CAPACITY) { + initialCapacity = MAXIMUM_CAPACITY; + } + int c = initialCapacity / ssize; + if (c * ssize < initialCapacity) { + ++c; + } + int cap = 1; + while (cap < c) { + cap <<= 1; + } + + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i] = new Segment(cap, loadFactor); + } + } + + /** + * Creates a new, empty map with the specified initial capacity + * and load factor and with the default concurrencyLevel (16). + * + * @param initialCapacity The implementation performs internal + * sizing to accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of elements per + * bin exceeds this threshold. + * @throws IllegalArgumentException if the initial capacity of + * elements is negative or the load factor is nonpositive + * + * @since 1.6 + */ + public ConcurrentWeakHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with the specified initial capacity, + * and with default load factor (0.75) and concurrencyLevel (16). + * + * @param initialCapacity the initial capacity. The implementation + * performs internal sizing to accommodate this many elements. + * @throws IllegalArgumentException if the initial capacity of + * elements is negative. + */ + public ConcurrentWeakHashMap(int initialCapacity) { + this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with a default initial capacity (16), + * load factor (0.75) and concurrencyLevel (16). + */ + public ConcurrentWeakHashMap() { + this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new map with the same mappings as the given map. + * The map is created with a capacity of 1.5 times the number + * of mappings in the given map or 16 (whichever is greater), + * and a default load factor (0.75) and concurrencyLevel (16). + * + * @param m the map + */ + public ConcurrentWeakHashMap(Map m) { + this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, + DEFAULT_INITIAL_CAPACITY), + DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + putAll(m); + } + + /** + * Returns true if this map contains no key-value mappings. + * + * @return true if this map contains no key-value mappings + */ + @Override + public boolean isEmpty() { + final Segment[] segments = this.segments; + /* + * We keep track of per-segment modCounts to avoid ABA + * problems in which an element in one segment was added and + * in another removed during traversal, in which case the + * table was never actually empty at any point. Note the + * similar use of modCounts in the size() and containsValue() + * methods, which are the only other methods also susceptible + * to ABA problems. + */ + int[] mc = new int[segments.length]; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + if (segments[i].count != 0) { + return false; + } else { + mcsum += mc[i] = segments[i].modCount; + } + } + // If mcsum happens to be zero, then we know we got a snapshot + // before any modifications at all were made. This is + // probably common enough to bother tracking. + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + if (segments[i].count != 0 || + mc[i] != segments[i].modCount) { + return false; + } + } + } + return true; + } + + /** + * Returns the number of key-value mappings in this map. If the + * map contains more than Integer.MAX_VALUE elements, returns + * Integer.MAX_VALUE. + * + * @return the number of key-value mappings in this map + */ + @Override + public int size() { + final Segment[] segments = this.segments; + long sum = 0; + long check = 0; + int[] mc = new int[segments.length]; + // Try a few times to get accurate count. On failure due to + // continuous async changes in table, resort to locking. + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { + check = 0; + sum = 0; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + sum += segments[i].count; + mcsum += mc[i] = segments[i].modCount; + } + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + check += segments[i].count; + if (mc[i] != segments[i].modCount) { + check = -1; // force retry + break; + } + } + } + if (check == sum) { + break; + } + } + if (check != sum) { // Resort to locking all segments + sum = 0; + for (int i = 0; i < segments.length; ++i) { + segments[i].lock(); + } + for (int i = 0; i < segments.length; ++i) { + sum += segments[i].count; + } + for (int i = 0; i < segments.length; ++i) { + segments[i].unlock(); + } + } + if (sum > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)sum; + } + } + + /** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + * + *

More formally, if this map contains a mapping from a key + * {@code k} to a value {@code v} such that {@code key.equals(k)}, + * then this method returns {@code v}; otherwise it returns + * {@code null}. (There can be at most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ + @Override + public V get(Object key) { + int hash = hash(key.hashCode()); + return segmentFor(hash).get(key, hash); + } + + /** + * Tests if the specified object is a key in this table. + * + * @param key possible key + * @return true if and only if the specified object + * is a key in this table, as determined by the + * equals method; false otherwise. + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean containsKey(Object key) { + int hash = hash(key.hashCode()); + return segmentFor(hash).containsKey(key, hash); + } + + /** + * Returns true if this map maps one or more keys to the + * specified value. Note: This method requires a full internal + * traversal of the hash table, and so is much slower than + * method containsKey. + * + * @param value value whose presence in this map is to be tested + * @return true if this map maps one or more keys to the + * specified value + * @throws NullPointerException if the specified value is null + */ + @Override + public boolean containsValue(Object value) { + if (value == null) { + throw new NullPointerException(); + } + + // See explanation of modCount use above + + final Segment[] segments = this.segments; + int[] mc = new int[segments.length]; + + // Try a few times without locking + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { + int sum = 0; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + int c = segments[i].count; + mcsum += mc[i] = segments[i].modCount; + if (segments[i].containsValue(value)) { + return true; + } + } + boolean cleanSweep = true; + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + int c = segments[i].count; + if (mc[i] != segments[i].modCount) { + cleanSweep = false; + break; + } + } + } + if (cleanSweep) { + return false; + } + } + // Resort to locking all segments + for (int i = 0; i < segments.length; ++i) { + segments[i].lock(); + } + boolean found = false; + try { + for (int i = 0; i < segments.length; ++i) { + if (segments[i].containsValue(value)) { + found = true; + break; + } + } + } finally { + for (int i = 0; i < segments.length; ++i) { + segments[i].unlock(); + } + } + return found; + } + + /** + * Legacy method testing if some key maps into the specified value + * in this table. This method is identical in functionality to + * {@link #containsValue}, and exists solely to ensure + * full compatibility with class {@link java.util.Hashtable}, + * which supported this method prior to introduction of the + * Java Collections framework. + * @param value a value to search for + * @return true if and only if some key maps to the + * value argument in this table as + * determined by the equals method; + * false otherwise + * @throws NullPointerException if the specified value is null + */ + public boolean contains(Object value) { + return containsValue(value); + } + + /** + * Maps the specified key to the specified value in this table. + * Neither the key nor the value can be null. + * + *

The value can be retrieved by calling the get method + * with a key that is equal to the original key. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V put(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hash(key.hashCode()); + return segmentFor(hash).put(key, hash, value, false); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V putIfAbsent(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hash(key.hashCode()); + return segmentFor(hash).put(key, hash, value, true); + } + + /** + * Copies all of the mappings from the specified map to this one. + * These mappings replace any mappings that this map had for any of the + * keys currently in the specified map. + * + * @param m mappings to be stored in this map + */ + @Override + public void putAll(Map m) { + for (Map.Entry e : m.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + /** + * Removes the key (and its corresponding value) from this map. + * This method does nothing if the key is not in the map. + * + * @param key the key that needs to be removed + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key is null + */ + @Override + public V remove(Object key) { + int hash = hash(key.hashCode()); + return segmentFor(hash).remove(key, hash, null, false); + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean remove(Object key, Object value) { + int hash = hash(key.hashCode()); + if (value == null) { + return false; + } + return segmentFor(hash).remove(key, hash, value, false) != null; + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if any of the arguments are null + */ + @Override + public boolean replace(K key, V oldValue, V newValue) { + if (oldValue == null || newValue == null) { + throw new NullPointerException(); + } + int hash = hash(key.hashCode()); + return segmentFor(hash).replace(key, hash, oldValue, newValue); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V replace(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hash(key.hashCode()); + return segmentFor(hash).replace(key, hash, value); + } + + /** + * Removes all of the mappings from this map. + */ + @Override + public void clear() { + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i].clear(); + } + } + + /** + * Returns a {@link Set} view of the keys contained in this map. + * The set is backed by the map, so changes to the map are + * reflected in the set, and vice-versa. The set supports element + * removal, which removes the corresponding mapping from this map, + * via the Iterator.remove, Set.remove, + * removeAll, retainAll, and clear + * operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Set keySet() { + Set ks = this.keySet; + return ks != null ? ks : (this.keySet = new KeySet()); + } + + /** + * Returns a {@link Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are + * reflected in the collection, and vice-versa. The collection + * supports element removal, which removes the corresponding + * mapping from this map, via the Iterator.remove, + * Collection.remove, removeAll, + * retainAll, and clear operations. It does not + * support the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Collection values() { + Collection vs = this.values; + return vs != null ? vs : (this.values = new Values()); + } + + /** + * Returns a {@link Set} view of the mappings contained in this map. + * The set is backed by the map, so changes to the map are + * reflected in the set, and vice-versa. The set supports element + * removal, which removes the corresponding mapping from the map, + * via the Iterator.remove, Set.remove, + * removeAll, retainAll, and clear + * operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Set> entrySet() { + Set> es = this.entrySet; + return es != null ? es : (this.entrySet = new EntrySet()); + } + + /** + * Returns an enumeration of the keys in this table. + * + * @return an enumeration of the keys in this table + * @see #keySet() + */ + public Enumeration keys() { + return new KeyIterator(); + } + + /** + * Returns an enumeration of the values in this table. + * + * @return an enumeration of the values in this table + * @see #values() + */ + public Enumeration elements() { + return new ValueIterator(); + } + + /* ---------------- Iterator Support -------------- */ + + abstract class HashIterator { + int nextSegmentIndex; + int nextTableIndex; + HashEntry[] currentTable; + HashEntry nextEntry; + HashEntry lastReturned; + K currentKey; // Strong reference to weak key (prevents gc) + + HashIterator() { + this.nextSegmentIndex = ConcurrentWeakHashMap.this.segments.length - 1; + this.nextTableIndex = -1; + advance(); + } + + public boolean hasMoreElements() { return hasNext(); } + + final void advance() { + if (this.nextEntry != null && (this.nextEntry = this.nextEntry.next) != null) { + return; + } + + while (this.nextTableIndex >= 0) { + if ( (this.nextEntry = this.currentTable[this.nextTableIndex--]) != null) { + return; + } + } + + while (this.nextSegmentIndex >= 0) { + Segment seg = ConcurrentWeakHashMap.this.segments[this.nextSegmentIndex--]; + if (seg.count != 0) { + this.currentTable = seg.table; + for (int j = this.currentTable.length - 1; j >= 0; --j) { + if ( (this.nextEntry = this.currentTable[j]) != null) { + this.nextTableIndex = j - 1; + return; + } + } + } + } + } + + public boolean hasNext() { + while (this.nextEntry != null) { + if (this.nextEntry.keyRef.get() != null) { + return true; + } + advance(); + } + + return false; + } + + HashEntry nextEntry() { + do { + if (this.nextEntry == null) { + throw new NoSuchElementException(); + } + + this.lastReturned = this.nextEntry; + this.currentKey = this.lastReturned.keyRef.get(); + advance(); + } while (this.currentKey == null); // Skip GC'd keys + + return this.lastReturned; + } + + public void remove() { + if (this.lastReturned == null) { + throw new IllegalStateException(); + } + ConcurrentWeakHashMap.this.remove(this.currentKey); + this.lastReturned = null; + } + } + + final class KeyIterator + extends HashIterator + implements Iterator, Enumeration + { + @Override + public K next() { return super.nextEntry().keyRef.get(); } + @Override + public K nextElement() { return super.nextEntry().keyRef.get(); } + } + + final class ValueIterator + extends HashIterator + implements Iterator, Enumeration + { + @Override + public V next() { return super.nextEntry().value; } + @Override + public V nextElement() { return super.nextEntry().value; } + } + + /* + * This class is needed for JDK5 compatibility. + */ + static class SimpleEntry implements Entry, + java.io.Serializable { + private static final long serialVersionUID = -8499721149061103585L; + + private final K key; + private V value; + + public SimpleEntry(K key, V value) { + this.key = key; + this.value = value; + } + + public SimpleEntry(Entry entry) { + this.key = entry.getKey(); + this.value = entry.getValue(); + } + + @Override + public K getKey() { + return this.key; + } + + @Override + public V getValue() { + return this.value; + } + + @Override + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + @SuppressWarnings("unchecked") + Map.Entry e = (Map.Entry) o; + return eq(this.key, e.getKey()) && eq(this.value, e.getValue()); + } + + @Override + public int hashCode() { + return (this.key == null ? 0 : this.key.hashCode()) + ^ (this.value == null ? 0 : this.value.hashCode()); + } + + @Override + public String toString() { + return this.key + "=" + this.value; + } + + private static boolean eq(Object o1, Object o2) { + return o1 == null ? o2 == null : o1.equals(o2); + } + } + + + /** + * Custom Entry class used by EntryIterator.next(), that relays setValue + * changes to the underlying map. + */ + final class WriteThroughEntry extends SimpleEntry + { + private static final long serialVersionUID = -7900634345345313646L; + + WriteThroughEntry(K k, V v) { + super(k,v); + } + + /** + * Set our entry's value and write through to the map. The + * value to return is somewhat arbitrary here. Since a + * WriteThroughEntry does not necessarily track asynchronous + * changes, the most recent "previous" value could be + * different from what we return (or could even have been + * removed in which case the put will re-establish). We do not + * and cannot guarantee more. + */ + @Override + public V setValue(V value) { + if (value == null) { + throw new NullPointerException(); + } + V v = super.setValue(value); + ConcurrentWeakHashMap.this.put(getKey(), value); + return v; + } + } + + final class EntryIterator + extends HashIterator + implements Iterator> + { + @Override + public Map.Entry next() { + HashEntry e = super.nextEntry(); + return new WriteThroughEntry(e.keyRef.get(), e.value); + } + } + + final class KeySet extends AbstractSet { + @Override + public Iterator iterator() { + return new KeyIterator(); + } + @Override + public int size() { + return ConcurrentWeakHashMap.this.size(); + } + @Override + public boolean isEmpty() { + return ConcurrentWeakHashMap.this.isEmpty(); + } + @Override + public boolean contains(Object o) { + return ConcurrentWeakHashMap.this.containsKey(o); + } + @Override + public boolean remove(Object o) { + return ConcurrentWeakHashMap.this.remove(o) != null; + } + @Override + public void clear() { + ConcurrentWeakHashMap.this.clear(); + } + } + + final class Values extends AbstractCollection { + @Override + public Iterator iterator() { + return new ValueIterator(); + } + @Override + public int size() { + return ConcurrentWeakHashMap.this.size(); + } + @Override + public boolean isEmpty() { + return ConcurrentWeakHashMap.this.isEmpty(); + } + @Override + public boolean contains(Object o) { + return ConcurrentWeakHashMap.this.containsValue(o); + } + @Override + public void clear() { + ConcurrentWeakHashMap.this.clear(); + } + } + + final class EntrySet extends AbstractSet> { + @Override + public Iterator> iterator() { + return new EntryIterator(); + } + @Override + public boolean contains(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry)o; + V v = ConcurrentWeakHashMap.this.get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + @Override + public boolean remove(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry)o; + return ConcurrentWeakHashMap.this.remove(e.getKey(), e.getValue()); + } + @Override + public int size() { + return ConcurrentWeakHashMap.this.size(); + } + @Override + public boolean isEmpty() { + return ConcurrentWeakHashMap.this.isEmpty(); + } + @Override + public void clear() { + ConcurrentWeakHashMap.this.clear(); + } + } + + /* ---------------- Serialization Support -------------- */ + + /** + * Save the state of the ConcurrentWeakHashMap instance to a + * stream (i.e., serialize it). + * @param s the stream + * @serialData + * the key (Object) and value (Object) + * for each key-value mapping, followed by a null pair. + * The key-value mappings are emitted in no particular order. + */ + private void writeObject(java.io.ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + + for (int k = 0; k < this.segments.length; ++k) { + Segment seg = this.segments[k]; + seg.lock(); + try { + HashEntry[] tab = seg.table; + for (int i = 0; i < tab.length; ++i) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + K key = e.keyRef.get(); + if (key == null) { + continue; + } + + s.writeObject(key); + s.writeObject(e.value); + } + } + } finally { + seg.unlock(); + } + } + s.writeObject(null); + s.writeObject(null); + } + + /** + * Reconstitute the ConcurrentWeakHashMap instance from a + * stream (i.e., deserialize it). + * @param s the stream + */ + @SuppressWarnings("unchecked") + private void readObject(java.io.ObjectInputStream s) + throws IOException, ClassNotFoundException { + s.defaultReadObject(); + + // Initialize each segment to be minimally sized, and let grow. + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i].setTable(new HashEntry[1]); + } + + // Read the keys and values, and put the mappings in the table + for (;;) { + K key = (K) s.readObject(); + V value = (V) s.readObject(); + if (key == null) { + break; + } + put(key, value); + } + } +} diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index ad982af..455a97f 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -1,5 +1,5 @@ package net.engio.mbassy.multi.common; -import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Iterator; /** @@ -9,11 +9,11 @@ import java.util.Iterator; * @author bennidi * Date: 2/12/12 */ -public class StrongConcurrentSet extends AbstractConcurrentSet{ +public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - super(new HashMap>()); + super(new IdentityHashMap>()); } @Override diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java b/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java index 85ed9e3..49a991c 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java +++ b/src/main/java/net/engio/mbassy/multi/disruptor/DeadMessage.java @@ -9,21 +9,24 @@ package net.engio.mbassy.multi.disruptor; * @author dorkbox, llc * Date: 2/2/15 */ -final public class DeadMessage { +public final class DeadMessage { - public Object[] relatedMessages = new Object[3]; + private Object[] relatedMessages; public DeadMessage(Object message) { + this.relatedMessages = new Object[1]; this.relatedMessages[0] = message; } public DeadMessage(Object message1, Object message2) { + this.relatedMessages = new Object[2]; this.relatedMessages[0] = message1; this.relatedMessages[1] = message2; } public DeadMessage(Object message1, Object message2, Object message3 ) { + this.relatedMessages = new Object[3]; this.relatedMessages[0] = message1; this.relatedMessages[1] = message2; this.relatedMessages[2] = message3; diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java index a8cb540..6f9c82a 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java +++ b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchHolder.java @@ -6,8 +6,12 @@ package net.engio.mbassy.multi.disruptor; * Date: 2/2/15 */ public class DispatchHolder { + public MessageType messageType = MessageType.ONE; - public Runnable runnable = null; + public Object message1 = null; + public Object message2 = null; + public Object message3 = null; + public Object[] messages = null; public DispatchHolder() { } diff --git a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java index c486e4e..85e0442 100644 --- a/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java +++ b/src/main/java/net/engio/mbassy/multi/disruptor/DispatchProcessor.java @@ -1,41 +1,46 @@ package net.engio.mbassy.multi.disruptor; -import java.util.concurrent.ExecutorService; +import java.util.Collection; +import java.util.Queue; -import com.lmax.disruptor.WorkHandler; +import net.engio.mbassy.multi.MultiMBassador; +import net.engio.mbassy.multi.error.PublicationError; +import net.engio.mbassy.multi.subscription.Subscription; +import net.engio.mbassy.multi.subscription.SubscriptionManager; + +import com.lmax.disruptor.EventHandler; /** * @author dorkbox, llc Date: 2/2/15 */ -public class DispatchProcessor implements WorkHandler { +public class DispatchProcessor implements EventHandler { + private final MultiMBassador publisher; private final long ordinal; private final long numberOfConsumers; - private final ExecutorService invoke_Executor; + private final SubscriptionManager subscriptionManager; +// private final RingBuffer invoke_RingBuffer; + private final Queue queue; + + public DispatchProcessor(final MultiMBassador publisher, final long ordinal, final long numberOfConsumers, + final SubscriptionManager subscriptionManager, Queue queue) { + this.publisher = publisher; - public DispatchProcessor(final long ordinal, final long numberOfConsumers, - final ExecutorService invoke_Executor) { this.ordinal = ordinal; this.numberOfConsumers = numberOfConsumers; - this.invoke_Executor = invoke_Executor; + this.subscriptionManager = subscriptionManager; + this.queue = queue; } - - @Override - public void onEvent(DispatchHolder event) throws Exception { -// if (sequence % this.numberOfConsumers == this.ordinal) { -// System.err.println("handoff -" + this.ordinal); - - this.invoke_Executor.submit(event.runnable); -// event.runnable.run(); - + public void onEvent(DispatchHolder event, long sequence, boolean endOfBatch) throws Exception { + if (sequence % this.numberOfConsumers == this.ordinal) { // Process the event // switch (event.messageType) { // case ONE: { -// publish(event.message1); -// event.message1 = null; // cleanup + publish(event.message1); + event.message1 = null; // cleanup // return; // } // case TWO: { @@ -58,75 +63,75 @@ public class DispatchProcessor implements WorkHandler { // } // } -// } + } } -// private void publish(Object message) { -// Class messageClass = message.getClass(); + private void publish(Object message) { + Class messageClass = message.getClass(); + + SubscriptionManager manager = this.subscriptionManager; + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + + try { + boolean empty = subscriptions.isEmpty(); + if (empty) { + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + message = new DeadMessage(message); + + empty = subscriptions.isEmpty(); + } + + if (!empty) { +// // put this on the disruptor ring buffer +// final RingBuffer ringBuffer = this.invoke_RingBuffer; // -// SubscriptionManager manager = this.subscriptionManager; -// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); -// -// try { -// boolean empty = subscriptions.isEmpty(); -// if (empty) { -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// -// message = new DeadMessage(message); -// -// empty = subscriptions.isEmpty(); -// } -// -// if (!empty) { -//// // put this on the disruptor ring buffer -//// final RingBuffer ringBuffer = this.invoke_RingBuffer; -//// -//// // setup the job -//// final long seq = ringBuffer.next(); -//// try { -//// MessageHolder eventJob = ringBuffer.get(seq); -//// eventJob.messageType = MessageType.ONE; -//// eventJob.message1 = message; -//// eventJob.subscriptions = subscriptions; -//// } catch (Throwable e) { -//// this.publisher.handlePublicationError(new PublicationError() -//// .setMessage("Error while adding an asynchronous message") -//// .setCause(e) -//// .setPublishedObject(message)); -//// } finally { -//// // always publish the job -//// ringBuffer.publish(seq); -//// } -// -// -// -//// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are -//// ObjectPoolHolder messageHolder = this.pool.take(); -//// MessageHolder value = messageHolder.getValue(); -// MessageHolder messageHolder = new MessageHolder(); -// messageHolder.subscriptions= subscriptions; -// messageHolder.messageType = MessageType.ONE; -// messageHolder.message1 = message; -// -//// this.queue.put(messageHolder); -// -//// int counter = 200; -//// while (!this.queue.offer(messageHolder)) { -//// if (counter > 100) { -//// --counter; -//// } else if (counter > 0) { -//// --counter; -//// Thread.yield(); -//// } else { -//// LockSupport.parkNanos(1L); -//// } -//// } -// } -// } catch (Throwable e) { -// this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) -// .setPublishedObject(message)); -// } -// } +// // setup the job +// final long seq = ringBuffer.next(); +// try { +// MessageHolder eventJob = ringBuffer.get(seq); +// eventJob.messageType = MessageType.ONE; +// eventJob.message1 = message; +// eventJob.subscriptions = subscriptions; +// } catch (Throwable e) { +// this.publisher.handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); +// } finally { +// // always publish the job +// ringBuffer.publish(seq); +// } + + + +// // this is what gets parallelized. The collection IS NOT THREAD SAFE, but it's contents are +// ObjectPoolHolder messageHolder = this.pool.take(); +// MessageHolder value = messageHolder.getValue(); + MessageHolder messageHolder = new MessageHolder(); + messageHolder.subscriptions= subscriptions; + messageHolder.messageType = MessageType.ONE; + messageHolder.message1 = message; + +// this.queue.put(messageHolder); + +// int counter = 200; +// while (!this.queue.offer(messageHolder)) { +// if (counter > 100) { +// --counter; +// } else if (counter > 0) { +// --counter; +// Thread.yield(); +// } else { +// LockSupport.parkNanos(1L); +// } +// } + } + } catch (Throwable e) { + this.publisher.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e) + .setPublishedObject(message)); + } + } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 840e34f..604db7b 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -34,11 +34,18 @@ public class Subscription { private final MessageHandler handlerMetadata; private final IHandlerInvocation invocation; +// protected final Collection listeners; +// protected final Map, Boolean> listeners; +// protected final Map listeners; protected final IConcurrentSet listeners; Subscription(MessageHandler handler) { // this.listeners = new WeakConcurrentSet(); this.listeners = new StrongConcurrentSet(); +// this.listeners = new ConcurrentHashMap(); +// this.listeners = new CopyOnWriteArrayList(); +// this.listeners = new ConcurrentSkipListSet(); +// this.listeners = new ConcurrentWeakHashMap, Boolean>(); this.handlerMetadata = handler; IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); @@ -51,23 +58,12 @@ public class Subscription { /** * Check whether this subscription manages a message handler of the given message listener class - * - * @param listener - * @return */ + // only in unit test public boolean belongsTo(Class listener){ return this.handlerMetadata.isFromListener(listener); } - /** - * Check whether this subscriptions manages the given listener instance - * @param listener - * @return - */ - public boolean contains(Object listener){ - return this.listeners.contains(listener); - } - /** Check if this subscription permits sending objects as a VarArg (variable argument) */ public boolean isVarArg() { return this.handlerMetadata.isVarArg(); @@ -105,19 +101,24 @@ public class Subscription { return this.handlerMetadata.handlesMessage(messageTypes); } - public Class[] getHandledMessageTypes(){ + public Class[] getHandledMessageTypes() { return this.handlerMetadata.getHandledMessages(); } public void subscribe(Object listener) { +// this.listeners.put(listener, Boolean.TRUE); this.listeners.add(listener); } - /** * @return TRUE if the element was removed */ public boolean unsubscribe(Object existingListener) { +// Boolean remove = this.listeners.remove(existingListener); +// if (remove != null) { +// return true; +// } +// return false; return this.listeners.remove(existingListener); } @@ -125,15 +126,22 @@ public class Subscription { return this.listeners.isEmpty(); } + // only used in unit-test public int size() { return this.listeners.size(); } +// private AtomicLong counter = new AtomicLong(); public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { - if (this.listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); +// Collection listeners = this.listeners.keySet(); +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; - for (Object listener : this.listeners) { + if (listeners.size() > 0) { + Method handler = this.handlerMetadata.getHandler(); +// int count = 0; + for (Object listener : listeners) { +// count++; try { this.invocation.invoke(listener, handler, message); } catch (IllegalAccessException e) { @@ -171,14 +179,19 @@ public class Subscription { .setPublishedObject(message)); } } +// this.counter.getAndAdd(count); } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { - if (this.listeners.size() > 0) { +// Collection listeners = this.listeners.keySet(); +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); - for (Object listener : this.listeners) { + for (Object listener : listeners) { try { this.invocation.invoke(listener, handler, message1, message2); } catch (IllegalAccessException e) { @@ -224,10 +237,14 @@ public class Subscription { } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { - if (this.listeners.size() > 0) { +// Collection listeners = this.listeners.keySet(); +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); - for (Object listener : this.listeners) { + for (Object listener : listeners) { try { this.invocation.invoke(listener, handler, message1, message2, message3); } catch (IllegalAccessException e) { @@ -275,10 +292,14 @@ public class Subscription { } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { - if (this.listeners.size() > 0) { +// Collection listeners = this.listeners.keySet(); +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); - for (Object listener : this.listeners) { + for (Object listener : listeners) { try { this.invocation.invoke(listener, handler, messages); } catch (IllegalAccessException e) { diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 57085ba..45b1e77 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -13,15 +13,11 @@ import java.util.concurrent.ConcurrentHashMap; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; -import net.engio.mbassy.multi.common.SubscriptionPoolable; import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; -import dorkbox.util.objectPool.ObjectPool; -import dorkbox.util.objectPool.ObjectPoolFactory; - /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known @@ -35,13 +31,18 @@ import dorkbox.util.objectPool.ObjectPoolFactory; */ public class SubscriptionManager { + public static class SubHolder { + public int count = 0; + public Collection subs = new ArrayDeque(0); + } + // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, Collection> subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); + private final Map, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap, SubHolder>(50); private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // all subscriptions per messageHandler type @@ -64,10 +65,7 @@ public class SubscriptionManager { // synchronize read/write acces to the subscription maps private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); - private ObjectPool> pool; - public SubscriptionManager() { - this.pool = ObjectPoolFactory.create(new SubscriptionPoolable(), 1024); } public void unsubscribe(Object listener) { @@ -97,13 +95,16 @@ public class SubscriptionManager { Class clazz = handledMessageTypes[0]; // NOTE: Not thread-safe! must be synchronized in outer scope - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs != null) { - subs.remove(subscription); + SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); + if (subHolder != null) { + Collection subs = subHolder.subs; + if (subs != null) { + subs.remove(subscription); - if (subs.isEmpty()) { - // remove element - this.subscriptionsPerMessageSingle.remove(clazz); + if (subs.isEmpty()) { + // remove element + this.subscriptionsPerMessageSingle.remove(clazz); + } } } } else { @@ -203,12 +204,14 @@ public class SubscriptionManager { Class clazz = handledMessageTypes[0]; // NOTE: Not thread-safe! must be synchronized in outer scope - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs == null) { - subs = new ArrayList(); - this.subscriptionsPerMessageSingle.put(clazz, subs); + SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); + if (subHolder == null) { + subHolder = new SubHolder(); + this.subscriptionsPerMessageSingle.put(clazz, subHolder); } + Collection subs = subHolder.subs; subs.add(subscription); + subHolder.count++; // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive if (subscription.isVarArg()) { @@ -288,50 +291,61 @@ public class SubscriptionManager { } } - private final ThreadLocal> subscriptionCache = new ThreadLocal>() { - @Override - protected Collection initialValue() - { - return new ArrayDeque(16); // default - } - }; - // obtain the set of subscriptions for the given message type // Note: never returns null! public Collection getSubscriptionsByMessageType(Class messageType) { // thread safe publication - Collection subscriptions = this.subscriptionCache.get(); - subscriptions.clear(); + Collection subscriptions; try { this.LOCK.readLock().lock(); - Collection subs = this.subscriptionsPerMessageSingle.get(messageType); - if (subs != null) { - subscriptions.addAll(subs); + int count = 0; + Collection subs; + SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType); + if (primaryHolder != null) { + subscriptions = new ArrayDeque(count); + subs = primaryHolder.subs; + count = primaryHolder.count; + if (subs != null) { + subscriptions.addAll(subs); + } + } else { + subscriptions = new ArrayDeque(16); } // also add all subscriptions that match super types + SubHolder subHolder; ArrayList> types1 = superClassCache(messageType); if (types1 != null) { Class eventSuperType; int i; for (i = 0; i < types1.size(); i++) { eventSuperType = types1.get(i); - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageType)) { + subscriptions.add(sub); + } } } } - addVarArgClass(subscriptions, eventSuperType); + count += addVarArgClass(subscriptions, eventSuperType); } } - addVarArgClass(subscriptions, messageType); + count += addVarArgClass(subscriptions, messageType); + + if (primaryHolder != null) { + // save off our count, so our collection creation size is optimal. + primaryHolder.count = count; + } } finally { this.LOCK.readLock().unlock(); } @@ -487,6 +501,8 @@ public class SubscriptionManager { try { this.LOCK.readLock().lock(); + int count = 16; + // NOTE: Not thread-safe! must be synchronized in outer scope Collection subs = this.subscriptionsPerMessageMulti.getValue(messageTypes); if (subs != null) { @@ -497,7 +513,7 @@ public class SubscriptionManager { } } - + SubHolder subHolder; int size = messageTypes.length; if (size > 0) { boolean allSameType = true; @@ -536,10 +552,14 @@ public class SubscriptionManager { eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } @@ -547,6 +567,8 @@ public class SubscriptionManager { } } + + } finally { this.LOCK.readLock().unlock(); } @@ -571,19 +593,27 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - private void addVarArgClass(Collection subscriptions, Class messageType) { + private int addVarArgClass(Collection subscriptions, Class messageType) { // tricky part. We have to check the ARRAY version + SubHolder subHolder; Collection subs; + int count = 0; + Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } + return count; } public Class getVarArg(Class clazz) { @@ -594,16 +624,23 @@ public class SubscriptionManager { // a var-arg handler might match // tricky part. We have to check the ARRAY version /////////////// - private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { + private int addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { Collection subs; + SubHolder subHolder; + int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } @@ -612,14 +649,21 @@ public class SubscriptionManager { varArgClass = this.varArgClasses.get(eventSuperType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } } + + return count; } private void getSubsVarArg(Collection subscriptions, int length, int index,