diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 842cd5d..3026a93 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -15,34 +15,22 @@ */ package dorkbox.util.messagebus; -import com.lmax.disruptor.EventBusFactory; -import com.lmax.disruptor.LiteBlockingWaitStrategy; -import com.lmax.disruptor.PublicationExceptionHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.Sequencer; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.WorkProcessor; -import dorkbox.util.messagebus.common.thread.NamedThreadFactory; import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.error.PublicationError; +import dorkbox.util.messagebus.synchrony.AsyncDisruptor; +import dorkbox.util.messagebus.synchrony.Sync; +import dorkbox.util.messagebus.synchrony.Synchrony; import dorkbox.util.messagebus.publication.Publisher; import dorkbox.util.messagebus.publication.PublisherExact; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.SubscriptionManager; -import dorkbox.util.messagebus.utils.ClassUtils; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; /** - * The base class for all message bus implementations with support for asynchronous message dispatch + * The base class for all message bus implementations with support for asynchronous message dispatch. + * + * See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html + * tldr; we use single-writer-principle + Atomic.lazySet * * @author dorkbox, llc * Date: 2/2/15 @@ -50,24 +38,13 @@ import java.util.concurrent.locks.LockSupport; public class MessageBus implements IMessageBus { private final ErrorHandlingSupport errorHandler; -// private final LinkedBlockingQueue dispatchQueue; -// private final ArrayBlockingQueue dispatchQueue; -// private final LinkedTransferQueue dispatchQueue; -// private final Collection threads; - private final ClassUtils classUtils; private final SubscriptionManager subscriptionManager; private final Publisher publisher; + private final Synchrony syncPublication; + private final Synchrony asyncPublication; - /** - * Notifies the consumers during shutdown, that it's on purpose. - */ - private volatile boolean shuttingDown = false; - private WorkProcessor[] workProcessors; - private MessageHandler[] handlers; - private RingBuffer ringBuffer; - private Sequence workSequence; /** * By default, will permit subTypes and Varity Argument matching, and will use half of CPUs available for dispatching async messages @@ -84,7 +61,8 @@ class MessageBus implements IMessageBus { */ public MessageBus(int numberOfThreads) { - this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads); +// this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads); + this(PublishMode.ExactWithSuperTypes, numberOfThreads); } /** @@ -106,196 +84,29 @@ class MessageBus implements IMessageBus { numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1)); this.errorHandler = new DefaultErrorHandler(); -// this.dispatchQueue = new LinkedBlockingQueue(1024); -// this.dispatchQueue = new ArrayBlockingQueue(1024); -// this.dispatchQueue = new LinkedTransferQueue(); - classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final Subscriber subscriber; - /** - * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) - */ - subscriber = new Subscriber(errorHandler, classUtils); + /** + * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) + */ + this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler); switch (publishMode) { case Exact: - publisher = new PublisherExact(errorHandler, subscriber); + publisher = new PublisherExact(errorHandler, subscriptionManager); break; case ExactWithSuperTypes: - publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber); + publisher = new PublisherExactWithSuperTypes(errorHandler, subscriptionManager); break; case ExactWithSuperTypesAndVarity: default: - publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber); + publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriptionManager); } - this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber); - - - // Now we setup the disruptor and work handlers - - ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, - 0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter - new java.util.concurrent.LinkedTransferQueue(), - new NamedThreadFactory("MessageBus")); - - final PublicationExceptionHandler exceptionHandler = new PublicationExceptionHandler(errorHandler); - EventBusFactory factory = new EventBusFactory(); - - // setup the work handlers - handlers = new MessageHandler[numberOfThreads]; - for (int i = 0; i < handlers.length; i++) { - handlers[i] = new MessageHandler(publisher); // exactly one per thread is used - } - - -// final int BUFFER_SIZE = ringBufferSize * 64; -// final int BUFFER_SIZE = 1024 * 64; -// final int BUFFER_SIZE = 1024; -// final int BUFFER_SIZE = 16; - final int BUFFER_SIZE = 8; - - - WaitStrategy consumerWaitStrategy; - consumerWaitStrategy = new LiteBlockingWaitStrategy(); -// consumerWaitStrategy = new BlockingWaitStrategy(); -// consumerWaitStrategy = new YieldingWaitStrategy(); -// consumerWaitStrategy = new BusySpinWaitStrategy(); -// consumerWaitStrategy = new SleepingWaitStrategy(); -// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); -// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); -// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); - - - ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); - SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); - - - // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item - final int numWorkers = handlers.length; - workProcessors = new WorkProcessor[numWorkers]; - workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - - for (int i = 0; i < numWorkers; i++) { - workProcessors[i] = new WorkProcessor(ringBuffer, - sequenceBarrier, - handlers[i], - exceptionHandler, workSequence); - } - - // setup the WorkProcessor sequences (control what is consumed from the ring buffer) - final Sequence[] sequences = getSequences(); - ringBuffer.addGatingSequences(sequences); - - - // configure the start position for the WorkProcessors, and start them - final long cursor = ringBuffer.getCursor(); - workSequence.set(cursor); - - for (WorkProcessor processor : workProcessors) { - processor.getSequence() - .set(cursor); - executor.execute(processor); - } - - - -// this.threads = new ArrayDeque(numberOfThreads); -// final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); -// for (int i = 0; i < numberOfThreads; i++) { -// -// // each thread will run forever and process incoming message publication requests -// Runnable runnable = new Runnable() { -// @Override -// public -// void run() { -//// LinkedBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; -//// ArrayBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; -// LinkedTransferQueue IN_QUEUE = MessageBus.this.dispatchQueue; -// -// MultiNode node = new MultiNode(); -// while (!MessageBus.this.shuttingDown) { -// try { -// //noinspection InfiniteLoopStatement -// while (true) { -//// IN_QUEUE.take(node); -// final Object take = IN_QUEUE.take(); -//// Integer type = (Integer) MultiNode.lpMessageType(node); -//// switch (type) { -//// case 1: { -// publish(take); -//// break; -//// } -//// case 2: { -//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node)); -//// break; -//// } -//// case 3: { -//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)); -//// break; -//// } -//// default: { -//// publish(MultiNode.lpItem1(node)); -//// } -//// } -// } -// } catch (InterruptedException e) { -// if (!MessageBus.this.shuttingDown) { -// Integer type = (Integer) MultiNode.lpMessageType(node); -// switch (type) { -// case 1: { -// errorHandler.handlePublicationError(new PublicationError().setMessage( -// "Thread interrupted while processing message") -// .setCause(e) -// .setPublishedObject(MultiNode.lpItem1(node))); -// break; -// } -// case 2: { -// errorHandler.handlePublicationError(new PublicationError().setMessage( -// "Thread interrupted while processing message") -// .setCause(e) -// .setPublishedObject(MultiNode.lpItem1(node), -// MultiNode.lpItem2(node))); -// break; -// } -// case 3: { -// errorHandler.handlePublicationError(new PublicationError().setMessage( -// "Thread interrupted while processing message") -// .setCause(e) -// .setPublishedObject(MultiNode.lpItem1(node), -// MultiNode.lpItem2(node), -// MultiNode.lpItem3(node))); -// break; -// } -// default: { -// errorHandler.handlePublicationError(new PublicationError().setMessage( -// "Thread interrupted while processing message") -// .setCause(e) -// .setPublishedObject(MultiNode.lpItem1(node))); -// } -// } -// } -// } -// } -// } -// }; -// -// Thread runner = threadFactory.newThread(runnable); -// this.threads.add(runner); -// } - } - - // gets the sequences used for processing work - private - Sequence[] getSequences() { - final Sequence[] sequences = new Sequence[workProcessors.length + 1]; - for (int i = 0, size = workProcessors.length; i < size; i++) { - sequences[i] = workProcessors[i].getSequence(); - } - sequences[sequences.length - 1] = workSequence; // always add the work sequence - return sequences; + syncPublication = new Sync(); +// asyncPublication = new PubAsync(numberOfThreads, errorHandler, publisher, syncPublication); + asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, publisher, syncPublication); } /** @@ -324,57 +135,31 @@ class MessageBus implements IMessageBus { @Override public void publish(final Object message) { - publisher.publish(message); + publisher.publish(syncPublication, message); } @Override public void publish(final Object message1, final Object message2) { - publisher.publish(message1, message2); + publisher.publish(syncPublication, message1, message2); } @Override public void publish(final Object message1, final Object message2, final Object message3) { - publisher.publish(message1, message2, message3); + publisher.publish(syncPublication, message1, message2, message3); } @Override public void publish(final Object[] messages) { - publisher.publish(messages); + publisher.publish(syncPublication, messages); } @Override public void publishAsync(final Object message) { - if (message != null) { - final long seq = ringBuffer.next(); - - try { - MessageHolder job = ringBuffer.get(seq); - job.type = MessageType.ONE; - job.message1 = message; - } catch (Exception e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } finally { - // always publish the job - ringBuffer.publish(seq); - } - } - else { - throw new NullPointerException("Message cannot be null."); - } - -// try { -// this.dispatchQueue.transfer(message); -//// this.dispatchQueue.put(message); -// } catch (Exception e) { -// errorHandler.handlePublicationError(new PublicationError().setMessage( -// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message)); -// } + publisher.publish(asyncPublication, message); } @Override @@ -428,18 +213,7 @@ class MessageBus implements IMessageBus { @Override public final boolean hasPendingMessages() { - // from workerPool.drainAndHalt() - Sequence[] workerSequences = getSequences(); - final long cursor = ringBuffer.getCursor(); - for (Sequence s : workerSequences) { - if (cursor > s.get()) { - return true; - } - } - - return false; - -// return !this.dispatchQueue.isEmpty(); + return asyncPublication.hasPendingMessages(); } @Override @@ -451,36 +225,14 @@ class MessageBus implements IMessageBus { @Override public void start() { - if (shuttingDown) { - throw new Error("Unable to restart the MessageBus"); - } errorHandler.init(); - -// for (Thread t : this.threads) { -// t.start(); -// } + asyncPublication.start(); } @Override public void shutdown() { - this.shuttingDown = true; - -// for (Thread t : this.threads) { -// t.interrupt(); -// } - - for (WorkProcessor processor : workProcessors) { - processor.halt(); - } - - for (MessageHandler handler : handlers) { - while (!handler.isShutdown()) { - LockSupport.parkNanos(100L); // wait 100ms for handlers to quit - } - } - + this.asyncPublication.shutdown(); this.subscriptionManager.shutdown(); - this.classUtils.clear(); } } diff --git a/src/dorkbox/util/messagebus/MpmcMultiTransferArrayQueue.java b/src/dorkbox/util/messagebus/MpmcMultiTransferArrayQueue.java index 8cf0da1..504a106 100644 --- a/src/dorkbox/util/messagebus/MpmcMultiTransferArrayQueue.java +++ b/src/dorkbox/util/messagebus/MpmcMultiTransferArrayQueue.java @@ -15,6 +15,7 @@ */ package dorkbox.util.messagebus; +import dorkbox.util.messagebus.publication.disruptor.MessageType; import org.jctools.queues.MpmcArrayQueue; import org.jctools.util.UnsafeAccess; diff --git a/src/dorkbox/util/messagebus/MultiNode.java b/src/dorkbox/util/messagebus/MultiNode.java index 04f454e..40741fb 100644 --- a/src/dorkbox/util/messagebus/MultiNode.java +++ b/src/dorkbox/util/messagebus/MultiNode.java @@ -15,6 +15,7 @@ */ package dorkbox.util.messagebus; +import dorkbox.util.messagebus.publication.disruptor.MessageType; import org.jctools.util.UnsafeAccess; abstract class ColdItems { diff --git a/src/dorkbox/util/messagebus/common/AbstractConcurrentSet.java b/src/dorkbox/util/messagebus/common/AbstractConcurrentSet.java index 11d890a..62431a4 100644 --- a/src/dorkbox/util/messagebus/common/AbstractConcurrentSet.java +++ b/src/dorkbox/util/messagebus/common/AbstractConcurrentSet.java @@ -22,12 +22,11 @@ */ package dorkbox.util.messagebus.common; -import dorkbox.util.messagebus.common.adapter.StampedLock; - import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.StampedLock; /** * This data structure is optimized for non-blocking reads even when write operations occur. diff --git a/src/dorkbox/util/messagebus/common/HashMapTree.java b/src/dorkbox/util/messagebus/common/HashMapTree.java index 1116061..8414f2c 100644 --- a/src/dorkbox/util/messagebus/common/HashMapTree.java +++ b/src/dorkbox/util/messagebus/common/HashMapTree.java @@ -17,39 +17,57 @@ package dorkbox.util.messagebus.common; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; /** * Simple tree structure that is a map that contains a chain of keys to publish to a value. - *

- * NOT THREAD SAFE, each call must be protected by a read/write lock of some sort * * @author dorkbox, llc * Date: 2/2/15 */ public class HashMapTree { - private Map> children; + public static int INITIAL_SIZE = 4; + public static float LOAD_FACTOR = 0.8F; + + + private static + final ThreadLocal keyCache = new ThreadLocal() { + @Override + protected + Object initialValue() { + return JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1); + } + }; + + private static + final ThreadLocal valueCache = new ThreadLocal() { + @Override + protected + Object initialValue() { + return new HashMapTree(); + } + }; + +// Map> + private AtomicReference children = new AtomicReference(); private VALUE value; - private final int defaultSize; - private final float loadFactor; - public HashMapTree(final int defaultSize, final float loadFactor) { - this.defaultSize = defaultSize; - this.loadFactor = loadFactor; + + @SuppressWarnings("unchecked") + public static ConcurrentMap> cast(Object o) { + return (ConcurrentMap>) o; } - /** - * can be overridden to provide a custom backing map - */ - protected Map> createChildren(int defaultSize, float loadFactor) { - return JavaVersionAdapter.concurrentMap(defaultSize, loadFactor, 1); + + + public HashMapTree() { } + public final VALUE getValue() { return this.value; } @@ -66,143 +84,19 @@ public class HashMapTree { public final void clear() { - if (this.children != null) { - Set>> entrySet = this.children.entrySet(); - for (Entry> entry : entrySet) { - entry.getValue().clear(); - } - - this.children.clear(); - this.value = null; - } +// if (this.children != null) { +// Set>> entrySet = this.children.entrySet(); +// for (Entry> entry : entrySet) { +// entry.getValue().clear(); +// } +// +// this.children.clear(); +// this.value = null; +// } } - /** - * This IS NOT safe to call outside of root.remove(...) - *

- * Removes a branch from the tree, and cleans up, if necessary - */ - public final void remove(KEY key) { - if (key != null) { - removeLeaf(key); - } - } - - - /** - * This IS NOT safe to call outside of root.remove(...) - *

- * Removes a branch from the tree, and cleans up, if necessary - */ - public final void remove(KEY key1, KEY key2) { - if (key1 == null || key2 == null) { - return; - } - - HashMapTree leaf; - if (this.children != null) { - leaf = this.children.get(key1); - - if (leaf != null) { - leaf.removeLeaf(key2); - this.children.remove(key1); - - if (this.children.isEmpty()) { - this.children = null; - } - } - } - } - - /** - * This IS NOT safe to call outside of root.remove(...) - *

- * Removes a branch from the tree, and cleans up, if necessary - */ - public final void remove(KEY key1, KEY key2, KEY key3) { - if (key1 == null || key2 == null) { - return; - } - - HashMapTree leaf; - if (this.children != null) { - leaf = this.children.get(key1); - - if (leaf != null) { - leaf.remove(key2, key3); - this.children.remove(key1); - - if (this.children.isEmpty()) { - this.children = null; - } - } - } - } - - - /** - * This IS NOT safe to call outside of root.remove(...) - *

- * Removes a branch from the tree, and cleans up, if necessary - */ - @SuppressWarnings("unchecked") - public final void remove(KEY... keys) { - if (keys == null) { - return; - } - - removeLeaf(0, keys); - } - - - /** - * Removes a branch from the tree, and cleans up, if necessary - */ - private void removeLeaf(KEY key) { - if (key != null) { - if (this.children != null) { - HashMapTree leaf = this.children.get(key); - - if (leaf != null) { - leaf = this.children.get(key); - if (leaf != null) { - if (leaf.children == null && leaf.value == null) { - this.children.remove(key); - } - - if (this.children.isEmpty()) { - this.children = null; - } - } - } - } - } - } - - // keys CANNOT be null here! - private void removeLeaf(int index, KEY[] keys) { - if (index == keys.length) { - // we have reached the leaf to remove! - this.value = null; - this.children = null; - } else if (this.children != null) { - HashMapTree leaf = this.children.get(keys[index]); - - if (leaf != null) { - leaf.removeLeaf(index+1, keys); - if (leaf.children == null && leaf.value == null) { - this.children.remove(keys[index]); - } - - if (this.children.isEmpty()) { - this.children = null; - } - } - } - } - - public final VALUE put(VALUE value, KEY key) { + public final VALUE put(KEY key, VALUE value) { if (key == null) { throw new NullPointerException("keys"); } @@ -282,34 +176,62 @@ public class HashMapTree { } - private HashMapTree createLeaf(KEY key) { + + /** + * creates a child (if necessary) in an atomic way. The tree returned will either be the current one, or a new one. + * + * @param key the key for the new child + * @return + */ + @SuppressWarnings("unchecked") + private + HashMapTree createLeaf(KEY key) { if (key == null) { return null; } - HashMapTree objectTree; + final Object cached = keyCache.get(); + final Object checked = children.get(); + ConcurrentMap> kids; - if (this.children == null) { - this.children = createChildren(this.defaultSize, this.loadFactor); + if (checked == null) { + final boolean success = children.compareAndSet(null, cached); + if (success) { + keyCache.set(JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1)); + kids = cast(cached); + } + else { + kids = cast(children.get()); + } + } + else { + kids = cast(checked); } - objectTree = this.children.get(key); - // make sure we have a tree for the specified node - if (objectTree == null) { - objectTree = new HashMapTree(this.defaultSize, this.loadFactor); - this.children.put(key, objectTree); + final Object cached2 = valueCache.get(); + final HashMapTree tree = kids.putIfAbsent(key, (HashMapTree) cached2); + if (tree == null) { + // was absent + valueCache.set(new HashMapTree()); + return (HashMapTree) cached2; + } + else { + return tree; } - - return objectTree; } + + + + ///////////////////////////////////////// ///////////////////////////////////////// ///////////////////////////////////////// ///////////////////////////////////////// + public final VALUE get(KEY key) { if (key == null) { return null; @@ -317,7 +239,7 @@ public class HashMapTree { HashMapTree objectTree; // publish value from our children - objectTree = getLeaf(key); // protected by lock + objectTree = getLeaf(key); if (objectTree == null) { return null; @@ -331,7 +253,7 @@ public class HashMapTree { // publish value from our children tree = getLeaf(key1); // protected by lock if (tree != null) { - tree = tree.getLeaf(key2); // protected by lock + tree = tree.getLeaf(key2); } if (tree == null) { @@ -392,12 +314,14 @@ public class HashMapTree { if (this.children == null) { tree = null; } else { - tree = this.children.get(key); + final ConcurrentMap> o = cast(this.children.get()); + tree = o.get(key); } return tree; } + public final HashMapTree getLeaf(KEY key1, KEY key2) { HashMapTree tree; @@ -447,4 +371,135 @@ public class HashMapTree { return tree; } + + + + ///////////////////////////////////////// + ///////////////////////////////////////// + ///////////////////////////////////////// + ///////////////////////////////////////// + + + + /** + * This IS NOT safe to call outside of root.remove(...) + *

+ * Removes a branch from the tree, and cleans up, if necessary + */ + public final void remove(KEY key) { + if (key != null) { + removeLeaf(key); + } + } + + + /** + * This IS NOT safe to call outside of root.remove(...) + *

+ * Removes a branch from the tree, and cleans up, if necessary + */ + public final void remove(KEY key1, KEY key2) { + if (key1 == null || key2 == null) { + return; + } + + HashMapTree leaf; + if (this.children != null) { + leaf = getLeaf(key1); + + if (leaf != null) { + leaf.removeLeaf(key2); + + final ConcurrentMap> o = cast(this.children.get()); + o.remove(key1); + + if (o.isEmpty()) { + this.children.compareAndSet(o, null); + } + } + } + } + + /** + * This IS NOT safe to call outside of root.remove(...) + *

+ * Removes a branch from the tree, and cleans up, if necessary + */ + public final void remove(KEY key1, KEY key2, KEY key3) { + if (key1 == null || key2 == null) { + return; + } + +// HashMapTree leaf; +// if (this.children != null) { +// leaf = getLeaf(key1); +// +// if (leaf != null) { +// leaf.remove(key2, key3); +// this.children.remove(key1); +// +// if (this.children.isEmpty()) { +// this.children = null; +// } +// } +// } + } + + + /** + * This IS NOT safe to call outside of root.remove(...) + *

+ * Removes a branch from the tree, and cleans up, if necessary + */ + @SuppressWarnings("unchecked") + public final void remove(KEY... keys) { + if (keys == null) { + return; + } + + removeLeaf(0, keys); + } + + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + private void removeLeaf(KEY key) { + if (key != null) { + final ConcurrentMap> kids = cast(this.children.get()); + + if (kids != null) { + kids.remove(key); + + if (kids.isEmpty()) { + this.children.compareAndSet(kids, null); + } + } + } + } + + // keys CANNOT be null here! + private void removeLeaf(int index, KEY[] keys) { + if (index == keys.length) { + // we have reached the leaf to remove! + this.value = null; + this.children = null; + } else { + final ConcurrentMap> kids = cast(this.children.get()); + if (kids != null) { +// HashMapTree leaf = this.children.get(keys[index]); + +// if (leaf != null) { +// leaf.removeLeaf(index + 1, keys); +// if (leaf.children == null && leaf.value == null) { +// this.children.remove(keys[index]); +// } +// +// if (this.children.isEmpty()) { +// this.children = null; +// } +// } + } + } + } } diff --git a/src/dorkbox/util/messagebus/common/WeakConcurrentSet.java b/src/dorkbox/util/messagebus/common/WeakConcurrentSet.java index ecfd942..b230a12 100644 --- a/src/dorkbox/util/messagebus/common/WeakConcurrentSet.java +++ b/src/dorkbox/util/messagebus/common/WeakConcurrentSet.java @@ -22,11 +22,11 @@ */ package dorkbox.util.messagebus.common; -import dorkbox.util.messagebus.common.adapter.StampedLock; import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.WeakHashMap; +import java.util.concurrent.locks.StampedLock; /** * This implementation uses weak references to the elements. Iterators automatically perform cleanups of diff --git a/src/dorkbox/util/messagebus/publication/Publisher.java b/src/dorkbox/util/messagebus/publication/Publisher.java index 3d05bdd..49e4a7d 100644 --- a/src/dorkbox/util/messagebus/publication/Publisher.java +++ b/src/dorkbox/util/messagebus/publication/Publisher.java @@ -15,12 +15,11 @@ */ package dorkbox.util.messagebus.publication; +import dorkbox.util.messagebus.synchrony.Synchrony; + public interface Publisher { - void publish(Object message1); - - void publish(Object message1, Object message2); - - void publish(Object message1, Object message2, Object message3); - - void publish(Object[] messages); + void publish(final Synchrony synchrony, Object message1); + void publish(final Synchrony synchrony, Object message1, Object message2); + void publish(final Synchrony synchrony, Object message1, Object message2, Object message3); + void publish(final Synchrony synchrony, Object[] messages); } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java index 8381a39..c49662e 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java @@ -18,28 +18,29 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; +import dorkbox.util.messagebus.synchrony.Synchrony; @SuppressWarnings("Duplicates") public class PublisherExact implements Publisher { private final ErrorHandlingSupport errorHandler; - private final Subscriber subscriber; + private final SubscriptionManager subManager; public - PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { + PublisherExact(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) { this.errorHandler = errorHandler; - this.subscriber = subscriber; + this.subManager = subManager; } @Override public - void publish(final Object message1) { + void publish(final Synchrony synchrony, final Object message1) { try { final Class messageClass = message1.getClass(); - final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + final Subscription[] subscriptions = subManager.getExact(messageClass); // can return null // Run subscriptions if (subscriptions != null) { @@ -51,7 +52,7 @@ class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -72,14 +73,14 @@ class PublisherExact implements Publisher { @Override public - void publish(final Object message1, final Object message2) { + void publish(final Synchrony synchrony, final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null + final Subscription[] subscriptions = subManager.getExact(messageClass1, messageClass2); // can return null // lock.unlockRead(stamp); // Run subscriptions @@ -93,7 +94,7 @@ class PublisherExact implements Publisher { else { // Dead Event must EXACTLY MATCH (no subclasses) // stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -115,7 +116,7 @@ class PublisherExact implements Publisher { @Override public - void publish(final Object message1, final Object message2, final Object message3) { + void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); @@ -123,7 +124,7 @@ class PublisherExact implements Publisher { // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null + final Subscription[] subscriptions = subManager.getExact(messageClass1, messageClass2, messageClass3); // can return null // lock.unlockRead(stamp); // Run subscriptions @@ -137,7 +138,7 @@ class PublisherExact implements Publisher { else { // Dead Event must EXACTLY MATCH (no subclasses) // stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -159,7 +160,7 @@ class PublisherExact implements Publisher { @Override public - void publish(final Object[] messages) { - publish((Object) messages); + void publish(final Synchrony synchrony, final Object[] messages) { + publish(synchrony, (Object) messages); } } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java index 5423a32..bbbbf3b 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java @@ -18,54 +18,45 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; +import dorkbox.util.messagebus.synchrony.Synchrony; @SuppressWarnings("Duplicates") public class PublisherExactWithSuperTypes implements Publisher { + private final ErrorHandlingSupport errorHandler; - private final Subscriber subscriber; -// private final StampedLock lock; + private final SubscriptionManager subManager; public - PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { + PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) { this.errorHandler = errorHandler; - this.subscriber = subscriber; + this.subManager = subManager; } @Override public - void publish(final Object message1) { + void publish(final Synchrony synchrony, final Object message1) { try { final Class messageClass = message1.getClass(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null // Run subscriptions if (subscriptions != null) { - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message1); - } + synchrony.publish(subscriptions, message1); } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } + synchrony.publish(deadSubscriptions, new DeadMessage(message1)); } } } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) .setPublishedObject(message1)); } @@ -73,14 +64,14 @@ class PublisherExactWithSuperTypes implements Publisher { @Override public - void publish(final Object message1, final Object message2) { + void publish(final Synchrony synchrony, final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null + final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2); // can return null // lock.unlockRead(stamp); // Run subscriptions @@ -94,7 +85,7 @@ class PublisherExactWithSuperTypes implements Publisher { else { // Dead Event must EXACTLY MATCH (no subclasses) // stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -116,7 +107,7 @@ class PublisherExactWithSuperTypes implements Publisher { @Override public - void publish(final Object message1, final Object message2, final Object message3) { + void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); @@ -125,7 +116,7 @@ class PublisherExactWithSuperTypes implements Publisher { // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2, + final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null // lock.unlockRead(stamp); @@ -140,7 +131,7 @@ class PublisherExactWithSuperTypes implements Publisher { else { // Dead Event must EXACTLY MATCH (no subclasses) // stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -162,7 +153,7 @@ class PublisherExactWithSuperTypes implements Publisher { @Override public - void publish(final Object[] messages) { - publish((Object) messages); + void publish(final Synchrony synchrony, final Object[] messages) { + publish(synchrony, (Object) messages); } } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java index cc1500e..52cc5b7 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java @@ -18,8 +18,9 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; +import dorkbox.util.messagebus.synchrony.Synchrony; import dorkbox.util.messagebus.utils.VarArgUtils; import java.lang.reflect.Array; @@ -30,45 +31,41 @@ public class PublisherExactWithSuperTypesAndVarity implements Publisher { private final ErrorHandlingSupport errorHandler; - private final Subscriber subscriber; + private final SubscriptionManager subManager; private final AtomicBoolean varArgPossibility; final VarArgUtils varArgUtils; public - PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { + PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) { this.errorHandler = errorHandler; - this.subscriber = subscriber; + this.subManager = subManager; - varArgPossibility = subscriber.getVarArgPossibility(); - varArgUtils = subscriber.getVarArgUtils(); + varArgPossibility = subManager.getVarArgPossibility(); + varArgUtils = subManager.getVarArgUtils(); } @Override public - void publish(final Object message1) { + void publish(final Synchrony synchrony, final Object message1) { try { final Class messageClass = message1.getClass(); final boolean isArray = messageClass.isArray(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null boolean hasSubs = false; // Run subscriptions if (subscriptions != null) { hasSubs = true; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message1); - } + synchrony.publish(subscriptions, message1); } // publish to var arg, only if not already an array (because that would be unnecessary) if (varArgPossibility.get() && !isArray) { - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subManager); // CAN NOT RETURN NULL Subscription sub; int length = varArgSubs.length; @@ -80,6 +77,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message1; + synchrony.publish(varArgSubs, asArray); + + for (int i = 0; i < length; i++) { sub = varArgSubs[i]; sub.publish(asArray); @@ -88,8 +88,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) - final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass, - subscriber); // CAN NOT RETURN NULL + final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass, subManager); // CAN NOT RETURN NULL length = varArgSuperSubs.length; @@ -111,7 +110,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // only get here if there were no other subscriptions // Dead Event must EXACTLY MATCH (no subclasses) if (!hasSubs) { - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -132,14 +131,14 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { @Override public - void publish(final Object message1, final Object message2) { + void publish(final Synchrony synchrony, final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null + final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2); // can return null // lock.unlockRead(stamp); boolean hasSubs = false; @@ -160,7 +159,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2) { // stamp = lock.readLock(); - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subManager); // can NOT return null // lock.unlockRead(stamp); final int length = varArgSubs.length; @@ -181,8 +180,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) // stamp = lock.readLock(); - final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, - subscriber); // CAN NOT RETURN NULL + final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, subManager); // CAN NOT RETURN NULL // lock.unlockRead(stamp); @@ -210,7 +208,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) // lock.unlockRead(stamp); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -232,7 +230,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { @Override public - void publish(final Object message1, final Object message2, final Object message3) { + void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); @@ -240,7 +238,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // final StampedLock lock = this.lock; // long stamp = lock.readLock(); - final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null + final Subscription[] subs = subManager.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null // lock.unlockRead(stamp); @@ -263,7 +261,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { // stamp = lock.readLock(); - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subManager); // can NOT return null // lock.unlockRead(stamp); final int length = varArgSubs.length; @@ -287,7 +285,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) // stamp = lock.readLock(); final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3, - subscriber); // CAN NOT RETURN NULL + subManager); // CAN NOT RETURN NULL // lock.unlockRead(stamp); @@ -316,7 +314,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) // stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null // lock.unlockRead(stamp); if (deadSubscriptions != null) { @@ -338,7 +336,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { @Override public - void publish(final Object[] messages) { - publish((Object) messages); + void publish(final Synchrony synchrony, final Object[] messages) { + publish(synchrony, (Object) messages); } } diff --git a/src/com/lmax/disruptor/EventBusFactory.java b/src/dorkbox/util/messagebus/publication/disruptor/EventBusFactory.java similarity index 73% rename from src/com/lmax/disruptor/EventBusFactory.java rename to src/dorkbox/util/messagebus/publication/disruptor/EventBusFactory.java index e8628aa..3331bda 100644 --- a/src/com/lmax/disruptor/EventBusFactory.java +++ b/src/dorkbox/util/messagebus/publication/disruptor/EventBusFactory.java @@ -1,6 +1,6 @@ -package com.lmax.disruptor; +package dorkbox.util.messagebus.publication.disruptor; -import dorkbox.util.messagebus.MessageHolder; +import com.lmax.disruptor.EventFactory; /** * @author dorkbox, llc diff --git a/src/dorkbox/util/messagebus/MessageHandler.java b/src/dorkbox/util/messagebus/publication/disruptor/MessageHandler.java similarity index 71% rename from src/dorkbox/util/messagebus/MessageHandler.java rename to src/dorkbox/util/messagebus/publication/disruptor/MessageHandler.java index 2649cbc..636df1d 100644 --- a/src/dorkbox/util/messagebus/MessageHandler.java +++ b/src/dorkbox/util/messagebus/publication/disruptor/MessageHandler.java @@ -1,7 +1,8 @@ -package dorkbox.util.messagebus; +package dorkbox.util.messagebus.publication.disruptor; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.WorkHandler; +import dorkbox.util.messagebus.synchrony.Synchrony; import dorkbox.util.messagebus.publication.Publisher; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,12 +14,15 @@ import java.util.concurrent.atomic.AtomicBoolean; public class MessageHandler implements WorkHandler, LifecycleAware { private final Publisher publisher; + private final Synchrony syncPublication; AtomicBoolean shutdown = new AtomicBoolean(false); + public - MessageHandler(Publisher publisher) { + MessageHandler(final Publisher publisher, final Synchrony syncPublication) { this.publisher = publisher; + this.syncPublication = syncPublication; } @Override @@ -27,28 +31,25 @@ class MessageHandler implements WorkHandler, LifecycleAware { final int messageType = event.type; switch (messageType) { case MessageType.ONE: { - Object message1 = event.message1; -// System.err.println("(" + sequence + ")" + message1); -// this.workProcessor.release(sequence); - this.publisher.publish(message1); + this.publisher.publish(syncPublication, event.message1); return; } case MessageType.TWO: { Object message1 = event.message1; Object message2 = event.message2; - this.publisher.publish(message1, message2); + this.publisher.publish(syncPublication, message1, message2); return; } case MessageType.THREE: { Object message1 = event.message1; Object message2 = event.message2; Object message3 = event.message3; - this.publisher.publish(message1, message2, message3); + this.publisher.publish(syncPublication, message3, message1, message2); return; } case MessageType.ARRAY: { Object[] messages = event.messages; - this.publisher.publish(messages); + this.publisher.publish(syncPublication, messages); return; } } diff --git a/src/dorkbox/util/messagebus/MessageHolder.java b/src/dorkbox/util/messagebus/publication/disruptor/MessageHolder.java similarity index 65% rename from src/dorkbox/util/messagebus/MessageHolder.java rename to src/dorkbox/util/messagebus/publication/disruptor/MessageHolder.java index 944afba..efbd54e 100644 --- a/src/dorkbox/util/messagebus/MessageHolder.java +++ b/src/dorkbox/util/messagebus/publication/disruptor/MessageHolder.java @@ -1,4 +1,6 @@ -package dorkbox.util.messagebus; +package dorkbox.util.messagebus.publication.disruptor; + +import dorkbox.util.messagebus.subscription.Subscription; /** * @author dorkbox, llc Date: 2/2/15 @@ -6,6 +8,7 @@ package dorkbox.util.messagebus; public class MessageHolder { public int type = MessageType.ONE; + public Subscription[] subscriptions; public Object message1 = null; public Object message2 = null; diff --git a/src/dorkbox/util/messagebus/MessageType.java b/src/dorkbox/util/messagebus/publication/disruptor/MessageType.java similarity index 89% rename from src/dorkbox/util/messagebus/MessageType.java rename to src/dorkbox/util/messagebus/publication/disruptor/MessageType.java index efe4512..71a94fe 100644 --- a/src/dorkbox/util/messagebus/MessageType.java +++ b/src/dorkbox/util/messagebus/publication/disruptor/MessageType.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.util.messagebus; +package dorkbox.util.messagebus.publication.disruptor; -final class MessageType { +public final class MessageType { public static final int ONE = 1; public static final int TWO = 2; public static final int THREE = 3; diff --git a/src/com/lmax/disruptor/PublicationExceptionHandler.java b/src/dorkbox/util/messagebus/publication/disruptor/PublicationExceptionHandler.java similarity index 92% rename from src/com/lmax/disruptor/PublicationExceptionHandler.java rename to src/dorkbox/util/messagebus/publication/disruptor/PublicationExceptionHandler.java index c80e902..0a3ab61 100644 --- a/src/com/lmax/disruptor/PublicationExceptionHandler.java +++ b/src/dorkbox/util/messagebus/publication/disruptor/PublicationExceptionHandler.java @@ -1,5 +1,6 @@ -package com.lmax.disruptor; +package dorkbox.util.messagebus.publication.disruptor; +import com.lmax.disruptor.ExceptionHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; diff --git a/src/dorkbox/util/messagebus/subscription/Subscriber.java b/src/dorkbox/util/messagebus/subscription/Subscriber.java deleted file mode 100644 index 2eb8c65..0000000 --- a/src/dorkbox/util/messagebus/subscription/Subscriber.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.subscription; - -import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.MessageHandler; -import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.utils.ClassUtils; -import dorkbox.util.messagebus.utils.SubscriptionUtils; -import dorkbox.util.messagebus.utils.VarArgUtils; - -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted - */ -@SuppressWarnings("Duplicates") -public -class Subscriber { - public static final float LOAD_FACTOR = 0.8F; - - private final ErrorHandlingSupport errorHandler; - - private final SubscriptionUtils subUtils; - private final VarArgUtils varArgUtils; - - // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required - // this is the primary list for dispatching a specific message - // write access is synchronized and happens only when a listener of a specific class is registered the first time - final ConcurrentMap, ArrayList> subscriptionsPerMessageSingle; - private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - - // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - final AtomicBoolean varArgPossibility = new AtomicBoolean(false); - - ThreadLocal> listCache = new ThreadLocal>() { - @Override - protected - ArrayList initialValue() { - return new ArrayList(8); - } - }; - - - - public - Subscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { - this.errorHandler = errorHandler; - - this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(); - - this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR); - - // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR); - } - - public - AtomicBoolean getVarArgPossibility() { - return varArgPossibility; - } - - public - VarArgUtils getVarArgUtils() { - return varArgUtils; - } - - public - void clear() { - this.subUtils.clear(); - this.varArgUtils.clear(); - } - - // inside a write lock - // add this subscription to each of the handled types - // to activate this sub for publication - private - void registerMulti(final Subscription subscription, final Class listenerClass, - final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, final AtomicBoolean varArgPossibility) { - - final MessageHandler handler = subscription.getHandler(); - final Class[] messageHandlerTypes = handler.getHandledMessages(); - final int size = messageHandlerTypes.length; - - final Class type0 = messageHandlerTypes[0]; - - switch (size) { - case 0: { - // TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method? - errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass); - return; - } - case 1: { - // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types - final ArrayList cachedSubs = listCache.get(); - ArrayList subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs); - if (subs == null) { - listCache.set(new ArrayList(8)); - subs = cachedSubs; - - // is this handler able to accept var args? - if (handler.getVarArgClass() != null) { - varArgPossibility.lazySet(true); - } - } - - subs.add(subscription); - return; - } - case 2: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); - } - - subs.add(subscription); - return; - } - case 3: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); - } - - subs.add(subscription); - return; - } - default: { - ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, messageHandlerTypes); - } - - subs.add(subscription); - } - } - } - - - public - void shutdown() { - this.subscriptionsPerMessageSingle.clear(); - this.subscriptionsPerMessageMulti.clear(); - - clear(); - } - - public - ArrayList getExactAsArray(final Class messageClass) { - return subscriptionsPerMessageSingle.get(messageClass); - } - - public - ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { - return subscriptionsPerMessageMulti.get(messageClass1, messageClass2); - } - - public - ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); - } - - // can return null - public - Subscription[] getExact(final Class messageClass) { - final ArrayList collection = getExactAsArray(messageClass); - - if (collection != null) { - // convert to Array because the subscriptions can change and we want safe iteration over the list - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - public - Subscription[] getExact(final Class messageClass1, final Class messageClass2) { - final ArrayList collection = getExactAsArray(messageClass1, messageClass2); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - public - Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - - final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - public - Subscription[] getExactAndSuper(final Class messageClass) { - ArrayList collection = getExactAsArray(messageClass); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { - ArrayList collection = getExactAsArray(messageClass1, messageClass2); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, - this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - - ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, - this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } -} diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index c98e21b..1bca471 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -183,7 +183,7 @@ class Subscription { } public - void publishToSubscription(final Object... messages) throws Throwable { + void publish(final Object... messages) throws Throwable { final MethodAccess handler = this.handler.getHandler(); final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 7a18b41..d0c9d3d 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -15,14 +15,23 @@ */ package dorkbox.util.messagebus.subscription; +import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.utils.ClassUtils; +import dorkbox.util.messagebus.utils.SubscriptionUtils; +import dorkbox.util.messagebus.utils.VarArgUtils; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; - +/** + * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted + */ /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known @@ -34,6 +43,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public final class SubscriptionManager { + public static final float LOAD_FACTOR = 0.8F; + + // remember already processed classes that do not contain any message handlers private final Map, Boolean> nonListeners; @@ -44,27 +56,67 @@ class SubscriptionManager { private final ConcurrentMap, Subscription[]> subscriptionsPerListener; - private final Subscriber subscriber; + private final ErrorHandlingSupport errorHandler; + + private final SubscriptionUtils subUtils; + private final VarArgUtils varArgUtils; + + // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required + // this is the primary list for dispatching a specific message + // write access is synchronized and happens only when a listener of a specific class is registered the first time + final ConcurrentMap, List> subscriptionsPerMessageSingle; + private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; + + // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) + final AtomicBoolean varArgPossibility = new AtomicBoolean(false); + + ThreadLocal> listCache = new ThreadLocal>() { + @Override + protected + List initialValue() { + return new CopyOnWriteArrayList(); + } + }; + private final ClassUtils classUtils; + +//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to +// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the +// correct type too) public - SubscriptionManager(final int numberOfThreads, final Subscriber subscriber) { - this.subscriber = subscriber; + SubscriptionManager(final int numberOfThreads, final ErrorHandlingSupport errorHandler) { + this.errorHandler = errorHandler; + classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR); // modified ONLY during SUB/UNSUB - this.nonListeners = JavaVersionAdapter.concurrentMap(4, Subscriber.LOAD_FACTOR, numberOfThreads); + this.nonListeners = JavaVersionAdapter.concurrentMap(4, LOAD_FACTOR, numberOfThreads); - // only used during SUB/UNSUB, in a rw lock - this.subscriptionsPerListener = JavaVersionAdapter.concurrentMap(32, Subscriber.LOAD_FACTOR, 1); + // only used during SUB/UNSUB + this.subscriptionsPerListener = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads); + + + this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(); + + this.subUtils = new SubscriptionUtils(classUtils, numberOfThreads, LOAD_FACTOR); + + // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance of handlers + this.varArgUtils = new VarArgUtils(classUtils, numberOfThreads, LOAD_FACTOR); } public void shutdown() { this.nonListeners.clear(); - subscriber.shutdown(); + this.subscriptionsPerMessageSingle.clear(); + this.subscriptionsPerMessageMulti.clear(); + this.subscriptionsPerListener.clear(); + this.classUtils.clear(); + clear(); } public @@ -73,6 +125,35 @@ class SubscriptionManager { return; } + // when subscribing, this is a GREAT opportunity to figure out the classes/objects loaded -- their hierarchy, AND generate UUIDs + // for each CLASS that can be accessed. This then lets us lookup a UUID for each object that comes in -- if an ID is found (for + // any part of it's object hierarchy) -- it means that we have that listeners for that object. this is MUCH faster checking if + // we have subscriptions first (and failing). + // + // so during subscribe we can check "getUUID for all parameter.class accessed by this listener" -> then during publish "lookup + // UUID of incoming message.class" (+ it's super classes, if necessary) -> then check if UUID exists. If yes, then we know there + // are subs. if no - then it's a dead message. + // + // This lets us accomplish TWO things + // 1) be able quickly determine if there are dead messages + // 2) be able to create "multi-class" UUIDs, when two+ classes are represented (always) by the same UUID, by a clever mixing of + // the classes individual UUIDs. + // + // The generation of UUIDs happens ONLY during subscribe, and during publish they are looked up. This UUID can be a simple + // AtomicInteger that starts a MIN_VALUE and count's up. + + + // note: we can do PRE-STARTUP instrumentation (ie, BEFORE any classes are loaded by the classloader) and inject the UUID into + // every object (as a public static final field), then use reflection to look up this value. It would go something like this: + // 1) scan every class for annotations that match + // 2) for each method that has our annotation -- get the list of classes + hierarchy that are the parameters for the method + // 3) inject the UUID field into each class object that #2 returns, only if it doesn't already exist. use invalid field names + // (ie: start with numbers or ? or ^ or something + // + // then during SUB/UNSUB/PUB, we use this UUID for everything (and we can have multi-UUID lookups for the 'multi-arg' thing). + // If there is no UUID, then we just abort the SUB/UNSUB or send a deadmessage + + final Class listenerClass = listener.getClass(); if (this.nonListeners.containsKey(listenerClass)) { @@ -81,7 +162,7 @@ class SubscriptionManager { } // these are concurrent collections - subscriber.clear(); + clear(); // this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist. // ONCE subscriptions are in THIS map, they are considered AVAILABLE. @@ -103,7 +184,7 @@ class SubscriptionManager { - final AtomicBoolean varArgPossibility = subscriber.varArgPossibility; + final AtomicBoolean varArgPossibility = this.varArgPossibility; Subscription subscription; MessageHandler messageHandler; @@ -111,7 +192,7 @@ class SubscriptionManager { Class handlerType; // create the subscriptions - final ConcurrentMap, ArrayList> subsPerMessageSingle = subscriber.subscriptionsPerMessageSingle; + final ConcurrentMap, List> subsPerMessageSingle = this.subscriptionsPerMessageSingle; subscriptions = new Subscription[handlersSize]; for (int i = 0; i < handlersSize; i++) { @@ -129,10 +210,10 @@ class SubscriptionManager { handlerType = messageHandlerTypes[0]; // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types - final ArrayList cachedSubs = subscriber.listCache.get(); - ArrayList subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs); + final List cachedSubs = listCache.get(); + List subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs); if (subs == null) { - subscriber.listCache.set(new ArrayList(8)); + listCache.set(new CopyOnWriteArrayList()); } // create the subscription. This can be thrown away if the subscription succeeds in another thread @@ -144,11 +225,10 @@ class SubscriptionManager { // now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions - // putIfAbsent final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub if (previousSubs != null) { // another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure - // all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUT the correct list again. + // all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUCT the correct list again. // This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle. // since nothing is yet "subscribed" we can assign the correct values for everything now @@ -195,7 +275,7 @@ class SubscriptionManager { } // these are concurrent collections - subscriber.clear(); + clear(); final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { @@ -207,4 +287,248 @@ class SubscriptionManager { } } } + + + public + AtomicBoolean getVarArgPossibility() { + return varArgPossibility; + } + + public + VarArgUtils getVarArgUtils() { + return varArgUtils; + } + + public + void clear() { + this.subUtils.clear(); + this.varArgUtils.clear(); + } + + // inside a write lock + // add this subscription to each of the handled types + // to activate this sub for publication + private + void registerMulti(final Subscription subscription, final Class listenerClass, + final Map, List> subsPerMessageSingle, + final HashMapTree, ArrayList> subsPerMessageMulti, final AtomicBoolean varArgPossibility) { + + final MessageHandler handler = subscription.getHandler(); + final Class[] messageHandlerTypes = handler.getHandledMessages(); + final int size = messageHandlerTypes.length; + + final Class type0 = messageHandlerTypes[0]; + + switch (size) { + case 0: { + // TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method? + errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass); + return; + } + case 1: { + // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types + final List cachedSubs = listCache.get(); + List subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs); + if (subs == null) { + listCache.set(new CopyOnWriteArrayList()); +// listCache.set(new ArrayList(8)); + subs = cachedSubs; + + // is this handler able to accept var args? + if (handler.getVarArgClass() != null) { + varArgPossibility.lazySet(true); + } + } + + subs.add(subscription); + return; + } + case 2: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); + } + + subs.add(subscription); + return; + } + case 3: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); + } + + subs.add(subscription); + return; + } + default: { + ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, messageHandlerTypes); + } + + subs.add(subscription); + } + } + } + + + + public + List getExactAsArray(final Class messageClass) { + return subscriptionsPerMessageSingle.get(messageClass); + } + + public + ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2); + } + + public + ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); + } + + // can return null + public + Subscription[] getExact(final Class messageClass) { + final List collection = getExactAsArray(messageClass); + + if (collection != null) { + // convert to Array because the subscriptions can change and we want safe iteration over the list + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExact(final Class messageClass1, final Class messageClass2) { + final ArrayList collection = getExactAsArray(messageClass1, messageClass2); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass) { + List collection = getExactAsArray(messageClass); // can return null + + // now publish superClasses + final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubscriptions.isEmpty()) { + collection.addAll(superSubscriptions); + } + } + else if (!superSubscriptions.isEmpty()) { + collection = superSubscriptions; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { + ArrayList collection = getExactAsArray(messageClass1, messageClass2); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + public + Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } } diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java new file mode 100644 index 0000000..a3d4458 --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java @@ -0,0 +1,150 @@ +package dorkbox.util.messagebus.synchrony; + +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; + +/** + * + */ +public +class AsyncABQ implements Synchrony { + + private final ErrorHandlingSupport errorHandler; + private final ArrayBlockingQueue dispatchQueue; + private final Collection threads; + + /** + * Notifies the consumers during shutdown, that it's on purpose. + */ + private volatile boolean shuttingDown = false; + + + public + AsyncABQ(final int numberOfThreads, + final ErrorHandlingSupport errorHandler, + final Publisher publisher, + final Synchrony syncPublication) { + + this.errorHandler = errorHandler; + this.dispatchQueue = new ArrayBlockingQueue(1024); + + this.threads = new ArrayDeque(numberOfThreads); + final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); + for (int i = 0; i < numberOfThreads; i++) { + + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @Override + public + void run() { + final ArrayBlockingQueue IN_QUEUE = AsyncABQ.this.dispatchQueue; + final Publisher publisher1 = publisher; + final Synchrony syncPublication1 = syncPublication; + + while (!AsyncABQ.this.shuttingDown) { + try { + //noinspection InfiniteLoopStatement + while (true) { + final Object take = IN_QUEUE.take(); + publisher1.publish(syncPublication1, take); + } + } catch (InterruptedException e) { + if (!AsyncABQ.this.shuttingDown) { +// Integer type = (Integer) MultiNode.lpMessageType(node); +// switch (type) { +// case 1: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// break; +// } +// case 2: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node))); +// break; +// } +// case 3: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node), +// MultiNode.lpItem3(node))); +// break; +// } +// default: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// } +// } + } + } + } + } + }; + + Thread runner = threadFactory.newThread(runnable); + this.threads.add(runner); + } + } + + public + void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + this.dispatchQueue.put(message1); + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable { + + } + + public + void start() { + if (shuttingDown) { + throw new Error("Unable to restart the MessageBus"); + } + + for (Thread t : this.threads) { + t.start(); + } + } + + public + void shutdown() { + this.shuttingDown = true; + + for (Thread t : this.threads) { + t.interrupt(); + } + } + + public + boolean hasPendingMessages() { + return !this.dispatchQueue.isEmpty(); + } +} diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java new file mode 100644 index 0000000..940f5d6 --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java @@ -0,0 +1,180 @@ +package dorkbox.util.messagebus.synchrony; + +import com.lmax.disruptor.LiteBlockingWaitStrategy; +import com.lmax.disruptor.PhasedBackoffWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.Sequencer; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.WorkProcessor; +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.publication.disruptor.EventBusFactory; +import dorkbox.util.messagebus.publication.disruptor.MessageHandler; +import dorkbox.util.messagebus.publication.disruptor.MessageHolder; +import dorkbox.util.messagebus.publication.disruptor.MessageType; +import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +public +class AsyncDisruptor implements Synchrony { + + private final ErrorHandlingSupport errorHandler; + private WorkProcessor[] workProcessors; + private MessageHandler[] handlers; + private RingBuffer ringBuffer; + private Sequence workSequence; + + public + AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Publisher publisher, final Synchrony syncPublication) { + this.errorHandler = errorHandler; + // Now we setup the disruptor and work handlers + + ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, + 0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter + new java.util.concurrent.LinkedTransferQueue(), + new NamedThreadFactory("MessageBus")); + + final PublicationExceptionHandler exceptionHandler = new PublicationExceptionHandler(errorHandler); + EventBusFactory factory = new EventBusFactory(); + + // setup the work handlers + handlers = new MessageHandler[numberOfThreads]; + for (int i = 0; i < handlers.length; i++) { + handlers[i] = new MessageHandler(publisher, syncPublication); // exactly one per thread is used + } + + +// final int BUFFER_SIZE = ringBufferSize * 64; +// final int BUFFER_SIZE = 1024 * 64; +// final int BUFFER_SIZE = 1024; + final int BUFFER_SIZE = 32; +// final int BUFFER_SIZE = 16; +// final int BUFFER_SIZE = 8; +// final int BUFFER_SIZE = 4; + + + WaitStrategy consumerWaitStrategy; +// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one +// consumerWaitStrategy = new BlockingWaitStrategy(); +// consumerWaitStrategy = new YieldingWaitStrategy(); +// consumerWaitStrategy = new BusySpinWaitStrategy(); +// consumerWaitStrategy = new SleepingWaitStrategy(); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); + consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); + + + ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + + // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item + final int numWorkers = handlers.length; + workProcessors = new WorkProcessor[numWorkers]; + workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + for (int i = 0; i < numWorkers; i++) { + workProcessors[i] = new WorkProcessor(ringBuffer, + sequenceBarrier, + handlers[i], + exceptionHandler, workSequence); + } + + // setup the WorkProcessor sequences (control what is consumed from the ring buffer) + final Sequence[] sequences = getSequences(); + ringBuffer.addGatingSequences(sequences); + + + // configure the start position for the WorkProcessors, and start them + final long cursor = ringBuffer.getCursor(); + workSequence.set(cursor); + + for (WorkProcessor processor : workProcessors) { + processor.getSequence() + .set(cursor); + executor.execute(processor); + } + } + + + public + void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + long seq = ringBuffer.next(); + + MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.ONE; + job.subscriptions = subscriptions; + job.message1 = message1; + + ringBuffer.publish(seq); + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable { + + } + + // gets the sequences used for processing work + private + Sequence[] getSequences() { + final Sequence[] sequences = new Sequence[workProcessors.length + 1]; + for (int i = 0, size = workProcessors.length; i < size; i++) { + sequences[i] = workProcessors[i].getSequence(); + } + sequences[sequences.length - 1] = workSequence; // always add the work sequence + return sequences; + } + + + public + void start() { + } + + public + void shutdown() { + for (WorkProcessor processor : workProcessors) { + processor.halt(); + } + + for (MessageHandler handler : handlers) { + while (!handler.isShutdown()) { + LockSupport.parkNanos(100L); // wait 100ms for handlers to quit + } + } + } + + public + boolean hasPendingMessages() { + // from workerPool.drainAndHalt() + Sequence[] workerSequences = getSequences(); + final long cursor = ringBuffer.getCursor(); + for (Sequence s : workerSequences) { + if (cursor > s.get()) { + return true; + } + } + + return false; + } +} diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncLTQ.java b/src/dorkbox/util/messagebus/synchrony/AsyncLTQ.java new file mode 100644 index 0000000..b82ab9a --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/AsyncLTQ.java @@ -0,0 +1,148 @@ +package dorkbox.util.messagebus.synchrony; + +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.concurrent.LinkedTransferQueue; + +/** + * + */ +public +class AsyncLTQ implements Synchrony { + private final ErrorHandlingSupport errorHandler; + private final LinkedTransferQueue dispatchQueue; + private final Collection threads; + /** + * Notifies the consumers during shutdown, that it's on purpose. + */ + private volatile boolean shuttingDown = false; + + + public + AsyncLTQ(final int numberOfThreads, + final ErrorHandlingSupport errorHandler, + final Publisher publisher, + final Synchrony syncPublication) { + + this.errorHandler = errorHandler; + this.dispatchQueue = new LinkedTransferQueue(); + + this.threads = new ArrayDeque(numberOfThreads); + final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); + for (int i = 0; i < numberOfThreads; i++) { + + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @Override + public + void run() { + LinkedTransferQueue IN_QUEUE = AsyncLTQ.this.dispatchQueue; + final Publisher publisher1 = publisher; + final Synchrony syncPublication1 = syncPublication; + + while (!AsyncLTQ.this.shuttingDown) { + try { + //noinspection InfiniteLoopStatement + while (true) { + final Object take = IN_QUEUE.take(); + publisher1.publish(syncPublication1, take); + } + } catch (InterruptedException e) { + if (!AsyncLTQ.this.shuttingDown) { +// Integer type = (Integer) MultiNode.lpMessageType(node); +// switch (type) { +// case 1: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// break; +// } +// case 2: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node))); +// break; +// } +// case 3: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node), +// MultiNode.lpItem3(node))); +// break; +// } +// default: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// } +// } + } + } + } + } + }; + + Thread runner = threadFactory.newThread(runnable); + this.threads.add(runner); + } + } + + public + void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + this.dispatchQueue.transfer(message1); + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable { + + } + + public + void start() { + if (shuttingDown) { + throw new Error("Unable to restart the MessageBus"); + } + + for (Thread t : this.threads) { + t.start(); + } + } + + public + void shutdown() { + this.shuttingDown = true; + + for (Thread t : this.threads) { + t.interrupt(); + } + } + + public + boolean hasPendingMessages() { + return !this.dispatchQueue.isEmpty(); + } +} diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncMisc.java b/src/dorkbox/util/messagebus/synchrony/AsyncMisc.java new file mode 100644 index 0000000..6abfed9 --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/AsyncMisc.java @@ -0,0 +1,184 @@ +package dorkbox.util.messagebus.synchrony; + +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.publication.Publisher; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.concurrent.LinkedTransferQueue; + +/** + * + */ +public +class AsyncMisc implements Synchrony { + /** + * Notifies the consumers during shutdown, that it's on purpose. + */ + private volatile boolean shuttingDown = false; + + + // private final LinkedBlockingQueue dispatchQueue; +// private final ArrayBlockingQueue dispatchQueue; + private final LinkedTransferQueue dispatchQueue; + private final Collection threads; + + + public + AsyncMisc(final int numberOfThreads, + final ErrorHandlingSupport errorHandler, + final Publisher publisher, + final Synchrony syncPublication) { + +// this.dispatchQueue = new LinkedBlockingQueue(1024); +// this.dispatchQueue = new ArrayBlockingQueue(1024); + this.dispatchQueue = new LinkedTransferQueue(); + + this.threads = new ArrayDeque(numberOfThreads); + final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); + for (int i = 0; i < numberOfThreads; i++) { + + // each thread will run forever and process incoming message publication requests + Runnable runnable = new Runnable() { + @Override + public + void run() { +// LinkedBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; +// ArrayBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; + LinkedTransferQueue IN_QUEUE = AsyncMisc.this.dispatchQueue; + +// MultiNode node = new MultiNode(); + while (!AsyncMisc.this.shuttingDown) { + try { + //noinspection InfiniteLoopStatement + while (true) { +// IN_QUEUE.take(node); + final Object take = IN_QUEUE.take(); +// Integer type = (Integer) MultiNode.lpMessageType(node); +// switch (type) { +// case 1: { +// publish(take); +// break; +// } +// case 2: { +// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node)); +// break; +// } +// case 3: { +// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node)); +// break; +// } +// default: { +// publish(MultiNode.lpItem1(node)); +// } +// } + } + } catch (InterruptedException e) { + if (!AsyncMisc.this.shuttingDown) { +// Integer type = (Integer) MultiNode.lpMessageType(node); +// switch (type) { +// case 1: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// break; +// } +// case 2: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node))); +// break; +// } +// case 3: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node), +// MultiNode.lpItem2(node), +// MultiNode.lpItem3(node))); +// break; +// } +// default: { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Thread interrupted while processing message") +// .setCause(e) +// .setPublishedObject(MultiNode.lpItem1(node))); +// } +// } + } + } + } + } + }; + + Thread runner = threadFactory.newThread(runnable); + this.threads.add(runner); + } + } + + public + void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + +// try { +// this.dispatchQueue.transfer(message); +//// this.dispatchQueue.put(message); +// } catch (Exception e) { +// errorHandler.handlePublicationError(new PublicationError().setMessage( +// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message)); +// } + + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1); + } + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable { + + } + + public + void start() { + if (shuttingDown) { + throw new Error("Unable to restart the MessageBus"); + } + + for (Thread t : this.threads) { + t.start(); + } + } + + public + void shutdown() { + this.shuttingDown = true; + + for (Thread t : this.threads) { + t.interrupt(); + } + } + + public + boolean hasPendingMessages() { + return !this.dispatchQueue.isEmpty(); + } +} diff --git a/src/dorkbox/util/messagebus/synchrony/Sync.java b/src/dorkbox/util/messagebus/synchrony/Sync.java new file mode 100644 index 0000000..32611ea --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/Sync.java @@ -0,0 +1,51 @@ +package dorkbox.util.messagebus.synchrony; + +import dorkbox.util.messagebus.subscription.Subscription; + +public +class Sync implements Synchrony { + public + void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1); + } + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + + } + + @Override + public + void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable { + + } + + @Override + public + void start() { + + } + + @Override + public + void shutdown() { + + } + + @Override + public + boolean hasPendingMessages() { + return false; + } +} diff --git a/src/dorkbox/util/messagebus/synchrony/Synchrony.java b/src/dorkbox/util/messagebus/synchrony/Synchrony.java new file mode 100644 index 0000000..3c46e3e --- /dev/null +++ b/src/dorkbox/util/messagebus/synchrony/Synchrony.java @@ -0,0 +1,18 @@ +package dorkbox.util.messagebus.synchrony; + +import dorkbox.util.messagebus.subscription.Subscription; + +/** + * + */ +public +interface Synchrony { + void publish(final Subscription[] subscriptions, Object message1) throws Throwable; + void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ; + void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ; + void publish(final Subscription[] subscriptions, Object[] messages) throws Throwable ; + + void start(); + void shutdown(); + boolean hasPendingMessages(); +} diff --git a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java index 8cf2bca..fd06957 100644 --- a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java +++ b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java @@ -17,10 +17,11 @@ package dorkbox.util.messagebus.utils; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; import java.util.ArrayList; +import java.util.List; import java.util.Map; public final @@ -35,13 +36,13 @@ class SubscriptionUtils { public - SubscriptionUtils(final ClassUtils superClass, final float loadFactor) { + SubscriptionUtils(final ClassUtils superClass, final int numberOfThreads, final float loadFactor) { this.superClass = superClass; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, 1); + this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, numberOfThreads); this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>(); } @@ -62,7 +63,7 @@ class SubscriptionUtils { * @return CAN NOT RETURN NULL */ public - ArrayList getSuperSubscriptions(final Class clazz, final Subscriber subscriber) { + ArrayList getSuperSubscriptions(final Class clazz, final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.superClassSubscriptions; @@ -75,7 +76,7 @@ class SubscriptionUtils { final Class[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response Class superClass; - ArrayList superSubs; + List superSubs; Subscription sub; final int length = superClasses.length; @@ -84,7 +85,7 @@ class SubscriptionUtils { for (int i = 0; i < length; i++) { superClass = superClasses[i]; - superSubs = subscriber.getExactAsArray(superClass); + superSubs = subManager.getExactAsArray(superClass); if (superSubs != null) { superSubLength = superSubs.size(); @@ -115,7 +116,7 @@ class SubscriptionUtils { * @return CAN NOT RETURN NULL */ public - ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Subscriber subscriber) { + ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> cached = this.superClassSubscriptionsMulti; @@ -154,7 +155,7 @@ class SubscriptionUtils { continue; } - superSubs = subscriber.getExactAsArray(superClass1, superClass2); + superSubs = subManager.getExactAsArray(superClass1, superClass2); if (superSubs != null) { for (int k = 0; k < superSubs.size(); k++) { sub = superSubs.get(k); @@ -184,7 +185,7 @@ class SubscriptionUtils { */ public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Class clazz3, - final Subscriber subscriber) { + final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; @@ -234,7 +235,7 @@ class SubscriptionUtils { continue; } - superSubs = subscriber.getExactAsArray(superClass1, superClass2, superClass3); + superSubs = subManager.getExactAsArray(superClass1, superClass2, superClass3); if (superSubs != null) { for (int m = 0; m < superSubs.size(); m++) { sub = superSubs.get(m); diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java index 6960a7b..e76f311 100644 --- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java +++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java @@ -18,10 +18,11 @@ package dorkbox.util.messagebus.utils; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; import java.util.ArrayList; +import java.util.List; import java.util.Map; public final @@ -36,14 +37,14 @@ class VarArgUtils { public - VarArgUtils(final ClassUtils superClassUtils, final float loadFactor) { + VarArgUtils(final ClassUtils superClassUtils, final int numberOfThreads, final float loadFactor) { this.superClassUtils = superClassUtils; - this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1); + this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads); this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(); - this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1); + this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads); this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(); } @@ -62,7 +63,7 @@ class VarArgUtils { // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions public - Subscription[] getVarArgSubscriptions(final Class messageClass, final Subscriber subscriber) { + Subscription[] getVarArgSubscriptions(final Class messageClass, final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.varArgSubscriptionsSingle; @@ -72,7 +73,7 @@ class VarArgUtils { // this gets (and caches) our array type. This is never cleared. final Class arrayVersion = this.superClassUtils.getArrayClass(messageClass); - final ArrayList subs = subscriber.getExactAsArray(arrayVersion); + final List subs = subManager.getExactAsArray(arrayVersion); if (subs != null) { final int length = subs.size(); varArgSubs = new ArrayList(length); @@ -104,8 +105,8 @@ class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions public - Subscription[] getVarArgSuperSubscriptions(final Class messageClass, final Subscriber subscriber) { - final ArrayList subs = getVarArgSuperSubscriptions_List(messageClass, subscriber); + Subscription[] getVarArgSuperSubscriptions(final Class messageClass, final SubscriptionManager subManager) { + final ArrayList subs = getVarArgSuperSubscriptions_List(messageClass, subManager); final Subscription[] returnedSubscriptions = new Subscription[subs.size()]; subs.toArray(returnedSubscriptions); @@ -114,7 +115,7 @@ class VarArgUtils { // CAN NOT RETURN NULL private - ArrayList getVarArgSuperSubscriptions_List(final Class messageClass, final Subscriber subscriber) { + ArrayList getVarArgSuperSubscriptions_List(final Class messageClass, final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.varArgSuperSubscriptionsSingle; @@ -136,13 +137,13 @@ class VarArgUtils { Class type; Subscription sub; - ArrayList subs; + List subs; int length; MessageHandler handlerMetadata; for (int i = 0; i < typesLength; i++) { type = types[i]; - subs = subscriber.getExactAsArray(type); + subs = subManager.getExactAsArray(type); if (subs != null) { length = subs.size(); @@ -173,7 +174,7 @@ class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions public - Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Subscriber subscriber) { + Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; @@ -182,8 +183,8 @@ class VarArgUtils { if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber); - final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber); + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subManager); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subManager); subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); @@ -202,7 +203,7 @@ class VarArgUtils { // and then, returns the array'd version superclass subscriptions public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3, - final Subscriber subscriber) { + final SubscriptionManager subManager) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; @@ -211,9 +212,9 @@ class VarArgUtils { if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber); - final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber); - final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subscriber); + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subManager); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subManager); + final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subManager); subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3); diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 5ed422a..2dea0d8 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -22,15 +22,29 @@ */ package dorkbox.util.messagebus; -import dorkbox.util.messagebus.common.*; -import dorkbox.util.messagebus.common.adapter.StampedLock; +import dorkbox.util.messagebus.common.AssertSupport; +import dorkbox.util.messagebus.common.ConcurrentExecutor; +import dorkbox.util.messagebus.common.ListenerFactory; +import dorkbox.util.messagebus.common.SubscriptionValidator; +import dorkbox.util.messagebus.common.TestUtil; import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.listeners.*; -import dorkbox.util.messagebus.messages.*; -import dorkbox.util.messagebus.subscription.Subscriber; +import dorkbox.util.messagebus.listeners.AbstractMessageListener; +import dorkbox.util.messagebus.listeners.ICountableListener; +import dorkbox.util.messagebus.listeners.IMessageListener; +import dorkbox.util.messagebus.listeners.IMultipartMessageListener; +import dorkbox.util.messagebus.listeners.MessagesListener; +import dorkbox.util.messagebus.listeners.MultipartMessageListener; +import dorkbox.util.messagebus.listeners.Overloading; +import dorkbox.util.messagebus.listeners.StandardMessageListener; +import dorkbox.util.messagebus.messages.AbstractMessage; +import dorkbox.util.messagebus.messages.ICountable; +import dorkbox.util.messagebus.messages.IMessage; +import dorkbox.util.messagebus.messages.IMultipartMessage; +import dorkbox.util.messagebus.messages.MessageTypes; +import dorkbox.util.messagebus.messages.MultipartMessage; +import dorkbox.util.messagebus.messages.StandardMessage; import dorkbox.util.messagebus.subscription.SubscriptionManager; -import dorkbox.util.messagebus.utils.ClassUtils; import org.junit.Test; /** @@ -157,11 +171,8 @@ public class SubscriptionManagerTest extends AssertSupport { ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class); final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final StampedLock lock = new StampedLock(); - final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final Subscriber subscriber = new Subscriber(errorHandler, classUtils); + final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler); - SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles( @@ -181,21 +192,17 @@ public class SubscriptionManagerTest extends AssertSupport { private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) { final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final StampedLock lock = new StampedLock(); - final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final Subscriber subscriber = new Subscriber(errorHandler, classUtils); - - final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber); + final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); - validator.validate(subscriber); + validator.validate(subscriptionManager); ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1); listeners.clear(); - validator.validate(subscriber); + validator.validate(subscriptionManager); } } diff --git a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java index 705e623..ccf3bcd 100644 --- a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -22,8 +22,8 @@ */ package dorkbox.util.messagebus.common; -import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; +import dorkbox.util.messagebus.subscription.SubscriptionManager; import java.util.ArrayDeque; import java.util.Arrays; @@ -60,13 +60,13 @@ public class SubscriptionValidator extends AssertSupport { // match subscriptions with existing validation entries // for each tuple of subscriber and message type the specified number of listeners must exist - public void validate(Subscriber subscriber) { + public void validate(SubscriptionManager subManager) { for (Class messageType : this.messageTypes) { Collection validationEntries = getEntries(messageType); // we split subs + superSubs into TWO calls. Collection collection = new ArrayDeque(8); - Subscription[] subscriptions = subscriber.getExactAndSuper(messageType); + Subscription[] subscriptions = subManager.getExactAndSuper(messageType); if (subscriptions != null) { collection.addAll(Arrays.asList(subscriptions)); }