polish and reasonable performance.

This commit is contained in:
nathan 2016-01-15 23:35:20 +01:00
parent eae63b3f8a
commit e8efa4390b
28 changed files with 1472 additions and 931 deletions

View File

@ -15,34 +15,22 @@
*/
package dorkbox.util.messagebus;
import com.lmax.disruptor.EventBusFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PublicationExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.synchrony.AsyncDisruptor;
import dorkbox.util.messagebus.synchrony.Sync;
import dorkbox.util.messagebus.synchrony.Synchrony;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
* The base class for all message bus implementations with support for asynchronous message dispatch.
*
* See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
* tldr; we use single-writer-principle + Atomic.lazySet
*
* @author dorkbox, llc
* Date: 2/2/15
@ -50,24 +38,13 @@ import java.util.concurrent.locks.LockSupport;
public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
// private final LinkedBlockingQueue<Object> dispatchQueue;
// private final ArrayBlockingQueue<Object> dispatchQueue;
// private final LinkedTransferQueue<Object> dispatchQueue;
// private final Collection<Thread> threads;
private final ClassUtils classUtils;
private final SubscriptionManager subscriptionManager;
private final Publisher publisher;
private final Synchrony syncPublication;
private final Synchrony asyncPublication;
/**
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown = false;
private WorkProcessor[] workProcessors;
private MessageHandler[] handlers;
private RingBuffer<MessageHolder> ringBuffer;
private Sequence workSequence;
/**
* By default, will permit subTypes and Varity Argument matching, and will use half of CPUs available for dispatching async messages
@ -84,7 +61,8 @@ class MessageBus implements IMessageBus {
*/
public
MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads);
// this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads);
this(PublishMode.ExactWithSuperTypes, numberOfThreads);
}
/**
@ -106,196 +84,29 @@ class MessageBus implements IMessageBus {
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
this.errorHandler = new DefaultErrorHandler();
// this.dispatchQueue = new LinkedBlockingQueue<Object>(1024);
// this.dispatchQueue = new ArrayBlockingQueue<Object>(1024);
// this.dispatchQueue = new LinkedTransferQueue<Object>();
classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber;
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
subscriber = new Subscriber(errorHandler, classUtils);
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler);
switch (publishMode) {
case Exact:
publisher = new PublisherExact(errorHandler, subscriber);
publisher = new PublisherExact(errorHandler, subscriptionManager);
break;
case ExactWithSuperTypes:
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber);
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriptionManager);
break;
case ExactWithSuperTypesAndVarity:
default:
publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber);
publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriptionManager);
}
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber);
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
new NamedThreadFactory("MessageBus"));
final PublicationExceptionHandler<MessageHolder> exceptionHandler = new PublicationExceptionHandler<MessageHolder>(errorHandler);
EventBusFactory factory = new EventBusFactory();
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(publisher); // exactly one per thread is used
}
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
// final int BUFFER_SIZE = 16;
final int BUFFER_SIZE = 8;
WaitStrategy consumerWaitStrategy;
consumerWaitStrategy = new LiteBlockingWaitStrategy();
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
final int numWorkers = handlers.length;
workProcessors = new WorkProcessor[numWorkers];
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer,
sequenceBarrier,
handlers[i],
exceptionHandler, workSequence);
}
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence()
.set(cursor);
executor.execute(processor);
}
// this.threads = new ArrayDeque<Thread>(numberOfThreads);
// final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
// for (int i = 0; i < numberOfThreads; i++) {
//
// // each thread will run forever and process incoming message publication requests
// Runnable runnable = new Runnable() {
// @Override
// public
// void run() {
//// LinkedBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//// ArrayBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
// LinkedTransferQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//
// MultiNode node = new MultiNode();
// while (!MessageBus.this.shuttingDown) {
// try {
// //noinspection InfiniteLoopStatement
// while (true) {
//// IN_QUEUE.take(node);
// final Object take = IN_QUEUE.take();
//// Integer type = (Integer) MultiNode.lpMessageType(node);
//// switch (type) {
//// case 1: {
// publish(take);
//// break;
//// }
//// case 2: {
//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node));
//// break;
//// }
//// case 3: {
//// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node));
//// break;
//// }
//// default: {
//// publish(MultiNode.lpItem1(node));
//// }
//// }
// }
// } catch (InterruptedException e) {
// if (!MessageBus.this.shuttingDown) {
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// break;
// }
// case 2: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node)));
// break;
// }
// case 3: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node),
// MultiNode.lpItem3(node)));
// break;
// }
// default: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// }
// }
// }
// }
// }
// }
// };
//
// Thread runner = threadFactory.newThread(runnable);
// this.threads.add(runner);
// }
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[workProcessors.length + 1];
for (int i = 0, size = workProcessors.length; i < size; i++) {
sequences[i] = workProcessors[i].getSequence();
}
sequences[sequences.length - 1] = workSequence; // always add the work sequence
return sequences;
syncPublication = new Sync();
// asyncPublication = new PubAsync(numberOfThreads, errorHandler, publisher, syncPublication);
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, publisher, syncPublication);
}
/**
@ -324,57 +135,31 @@ class MessageBus implements IMessageBus {
@Override
public
void publish(final Object message) {
publisher.publish(message);
publisher.publish(syncPublication, message);
}
@Override
public
void publish(final Object message1, final Object message2) {
publisher.publish(message1, message2);
publisher.publish(syncPublication, message1, message2);
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
publisher.publish(message1, message2, message3);
publisher.publish(syncPublication, message1, message2, message3);
}
@Override
public
void publish(final Object[] messages) {
publisher.publish(messages);
publisher.publish(syncPublication, messages);
}
@Override
public
void publishAsync(final Object message) {
if (message != null) {
final long seq = ringBuffer.next();
try {
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE;
job.message1 = message;
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
else {
throw new NullPointerException("Message cannot be null.");
}
// try {
// this.dispatchQueue.transfer(message);
//// this.dispatchQueue.put(message);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
// }
publisher.publish(asyncPublication, message);
}
@Override
@ -428,18 +213,7 @@ class MessageBus implements IMessageBus {
@Override
public final
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
// return !this.dispatchQueue.isEmpty();
return asyncPublication.hasPendingMessages();
}
@Override
@ -451,36 +225,14 @@ class MessageBus implements IMessageBus {
@Override
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
errorHandler.init();
// for (Thread t : this.threads) {
// t.start();
// }
asyncPublication.start();
}
@Override
public
void shutdown() {
this.shuttingDown = true;
// for (Thread t : this.threads) {
// t.interrupt();
// }
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
this.asyncPublication.shutdown();
this.subscriptionManager.shutdown();
this.classUtils.clear();
}
}

View File

@ -15,6 +15,7 @@
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.publication.disruptor.MessageType;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.UnsafeAccess;

View File

@ -15,6 +15,7 @@
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.publication.disruptor.MessageType;
import org.jctools.util.UnsafeAccess;
abstract class ColdItems {

View File

@ -22,12 +22,11 @@
*/
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.

View File

@ -17,39 +17,57 @@ package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
/**
* Simple tree structure that is a map that contains a chain of keys to publish to a value.
* <p>
* NOT THREAD SAFE, each call must be protected by a read/write lock of some sort
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public class HashMapTree<KEY, VALUE> {
private Map<KEY, HashMapTree<KEY, VALUE>> children;
public static int INITIAL_SIZE = 4;
public static float LOAD_FACTOR = 0.8F;
private static
final ThreadLocal<Object> keyCache = new ThreadLocal<Object>() {
@Override
protected
Object initialValue() {
return JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1);
}
};
private static
final ThreadLocal<Object> valueCache = new ThreadLocal<Object>() {
@Override
protected
Object initialValue() {
return new HashMapTree();
}
};
// Map<KEY, HashMapTree<KEY, VALUE>>
private AtomicReference<Object> children = new AtomicReference<Object>();
private VALUE value;
private final int defaultSize;
private final float loadFactor;
public HashMapTree(final int defaultSize, final float loadFactor) {
this.defaultSize = defaultSize;
this.loadFactor = loadFactor;
@SuppressWarnings("unchecked")
public static <KEY, VALUE> ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> cast(Object o) {
return (ConcurrentMap<KEY, HashMapTree<KEY, VALUE>>) o;
}
/**
* can be overridden to provide a custom backing map
*/
protected Map<KEY, HashMapTree<KEY, VALUE>> createChildren(int defaultSize, float loadFactor) {
return JavaVersionAdapter.concurrentMap(defaultSize, loadFactor, 1);
public HashMapTree() {
}
public final VALUE getValue() {
return this.value;
}
@ -66,143 +84,19 @@ public class HashMapTree<KEY, VALUE> {
public final void clear() {
if (this.children != null) {
Set<Entry<KEY, HashMapTree<KEY, VALUE>>> entrySet = this.children.entrySet();
for (Entry<KEY, HashMapTree<KEY, VALUE>> entry : entrySet) {
entry.getValue().clear();
}
this.children.clear();
this.value = null;
}
// if (this.children != null) {
// Set<Entry<KEY, HashMapTree<KEY, VALUE>>> entrySet = this.children.entrySet();
// for (Entry<KEY, HashMapTree<KEY, VALUE>> entry : entrySet) {
// entry.getValue().clear();
// }
//
// this.children.clear();
// this.value = null;
// }
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key) {
if (key != null) {
removeLeaf(key);
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
HashMapTree<KEY, VALUE> leaf;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
leaf.removeLeaf(key2);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
HashMapTree<KEY, VALUE> leaf;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
leaf.remove(key2, key3);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public final void remove(KEY... keys) {
if (keys == null) {
return;
}
removeLeaf(0, keys);
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private void removeLeaf(KEY key) {
if (key != null) {
if (this.children != null) {
HashMapTree<KEY, VALUE> leaf = this.children.get(key);
if (leaf != null) {
leaf = this.children.get(key);
if (leaf != null) {
if (leaf.children == null && leaf.value == null) {
this.children.remove(key);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
}
}
// keys CANNOT be null here!
private void removeLeaf(int index, KEY[] keys) {
if (index == keys.length) {
// we have reached the leaf to remove!
this.value = null;
this.children = null;
} else if (this.children != null) {
HashMapTree<KEY, VALUE> leaf = this.children.get(keys[index]);
if (leaf != null) {
leaf.removeLeaf(index+1, keys);
if (leaf.children == null && leaf.value == null) {
this.children.remove(keys[index]);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
public final VALUE put(VALUE value, KEY key) {
public final VALUE put(KEY key, VALUE value) {
if (key == null) {
throw new NullPointerException("keys");
}
@ -282,34 +176,62 @@ public class HashMapTree<KEY, VALUE> {
}
private HashMapTree<KEY, VALUE> createLeaf(KEY key) {
/**
* creates a child (if necessary) in an atomic way. The tree returned will either be the current one, or a new one.
*
* @param key the key for the new child
* @return
*/
@SuppressWarnings("unchecked")
private
HashMapTree<KEY, VALUE> createLeaf(KEY key) {
if (key == null) {
return null;
}
HashMapTree<KEY, VALUE> objectTree;
final Object cached = keyCache.get();
final Object checked = children.get();
ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> kids;
if (this.children == null) {
this.children = createChildren(this.defaultSize, this.loadFactor);
if (checked == null) {
final boolean success = children.compareAndSet(null, cached);
if (success) {
keyCache.set(JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1));
kids = cast(cached);
}
else {
kids = cast(children.get());
}
}
else {
kids = cast(checked);
}
objectTree = this.children.get(key);
// make sure we have a tree for the specified node
if (objectTree == null) {
objectTree = new HashMapTree<KEY, VALUE>(this.defaultSize, this.loadFactor);
this.children.put(key, objectTree);
final Object cached2 = valueCache.get();
final HashMapTree<KEY, VALUE> tree = kids.putIfAbsent(key, (HashMapTree<KEY, VALUE>) cached2);
if (tree == null) {
// was absent
valueCache.set(new HashMapTree());
return (HashMapTree<KEY, VALUE>) cached2;
}
else {
return tree;
}
return objectTree;
}
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
public final VALUE get(KEY key) {
if (key == null) {
return null;
@ -317,7 +239,7 @@ public class HashMapTree<KEY, VALUE> {
HashMapTree<KEY, VALUE> objectTree;
// publish value from our children
objectTree = getLeaf(key); // protected by lock
objectTree = getLeaf(key);
if (objectTree == null) {
return null;
@ -331,7 +253,7 @@ public class HashMapTree<KEY, VALUE> {
// publish value from our children
tree = getLeaf(key1); // protected by lock
if (tree != null) {
tree = tree.getLeaf(key2); // protected by lock
tree = tree.getLeaf(key2);
}
if (tree == null) {
@ -392,12 +314,14 @@ public class HashMapTree<KEY, VALUE> {
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
final ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> o = cast(this.children.get());
tree = o.get(key);
}
return tree;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key1, KEY key2) {
HashMapTree<KEY, VALUE> tree;
@ -447,4 +371,135 @@ public class HashMapTree<KEY, VALUE> {
return tree;
}
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key) {
if (key != null) {
removeLeaf(key);
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
HashMapTree<KEY, VALUE> leaf;
if (this.children != null) {
leaf = getLeaf(key1);
if (leaf != null) {
leaf.removeLeaf(key2);
final ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> o = cast(this.children.get());
o.remove(key1);
if (o.isEmpty()) {
this.children.compareAndSet(o, null);
}
}
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public final void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
// HashMapTree<KEY, VALUE> leaf;
// if (this.children != null) {
// leaf = getLeaf(key1);
//
// if (leaf != null) {
// leaf.remove(key2, key3);
// this.children.remove(key1);
//
// if (this.children.isEmpty()) {
// this.children = null;
// }
// }
// }
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public final void remove(KEY... keys) {
if (keys == null) {
return;
}
removeLeaf(0, keys);
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private void removeLeaf(KEY key) {
if (key != null) {
final ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> kids = cast(this.children.get());
if (kids != null) {
kids.remove(key);
if (kids.isEmpty()) {
this.children.compareAndSet(kids, null);
}
}
}
}
// keys CANNOT be null here!
private void removeLeaf(int index, KEY[] keys) {
if (index == keys.length) {
// we have reached the leaf to remove!
this.value = null;
this.children = null;
} else {
final ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> kids = cast(this.children.get());
if (kids != null) {
// HashMapTree<KEY, VALUE> leaf = this.children.get(keys[index]);
// if (leaf != null) {
// leaf.removeLeaf(index + 1, keys);
// if (leaf.children == null && leaf.value == null) {
// this.children.remove(keys[index]);
// }
//
// if (this.children.isEmpty()) {
// this.children = null;
// }
// }
}
}
}
}

View File

@ -22,11 +22,11 @@
*/
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
import java.util.concurrent.locks.StampedLock;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of

View File

@ -15,12 +15,11 @@
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.synchrony.Synchrony;
public interface Publisher {
void publish(Object message1);
void publish(Object message1, Object message2);
void publish(Object message1, Object message2, Object message3);
void publish(Object[] messages);
void publish(final Synchrony synchrony, Object message1);
void publish(final Synchrony synchrony, Object message1, Object message2);
void publish(final Synchrony synchrony, Object message1, Object message2, Object message3);
void publish(final Synchrony synchrony, Object[] messages);
}

View File

@ -18,28 +18,29 @@ package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.synchrony.Synchrony;
@SuppressWarnings("Duplicates")
public
class PublisherExact implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final SubscriptionManager subManager;
public
PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
PublisherExact(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.subManager = subManager;
}
@Override
public
void publish(final Object message1) {
void publish(final Synchrony synchrony, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
final Subscription[] subscriptions = subManager.getExact(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
@ -51,7 +52,7 @@ class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -72,14 +73,14 @@ class PublisherExact implements Publisher {
@Override
public
void publish(final Object message1, final Object message2) {
void publish(final Synchrony synchrony, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null
final Subscription[] subscriptions = subManager.getExact(messageClass1, messageClass2); // can return null
// lock.unlockRead(stamp);
// Run subscriptions
@ -93,7 +94,7 @@ class PublisherExact implements Publisher {
else {
// Dead Event must EXACTLY MATCH (no subclasses)
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -115,7 +116,7 @@ class PublisherExact implements Publisher {
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
@ -123,7 +124,7 @@ class PublisherExact implements Publisher {
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null
final Subscription[] subscriptions = subManager.getExact(messageClass1, messageClass2, messageClass3); // can return null
// lock.unlockRead(stamp);
// Run subscriptions
@ -137,7 +138,7 @@ class PublisherExact implements Publisher {
else {
// Dead Event must EXACTLY MATCH (no subclasses)
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -159,7 +160,7 @@ class PublisherExact implements Publisher {
@Override
public
void publish(final Object[] messages) {
publish((Object) messages);
void publish(final Synchrony synchrony, final Object[] messages) {
publish(synchrony, (Object) messages);
}
}

View File

@ -18,54 +18,45 @@ package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.synchrony.Synchrony;
@SuppressWarnings("Duplicates")
public
class PublisherExactWithSuperTypes implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
// private final StampedLock lock;
private final SubscriptionManager subManager;
public
PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.subManager = subManager;
}
@Override
public
void publish(final Object message1) {
void publish(final Synchrony synchrony, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
synchrony.publish(subscriptions, message1);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1));
}
@ -73,14 +64,14 @@ class PublisherExactWithSuperTypes implements Publisher {
@Override
public
void publish(final Object message1, final Object message2) {
void publish(final Synchrony synchrony, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2); // can return null
// lock.unlockRead(stamp);
// Run subscriptions
@ -94,7 +85,7 @@ class PublisherExactWithSuperTypes implements Publisher {
else {
// Dead Event must EXACTLY MATCH (no subclasses)
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -116,7 +107,7 @@ class PublisherExactWithSuperTypes implements Publisher {
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
@ -125,7 +116,7 @@ class PublisherExactWithSuperTypes implements Publisher {
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2,
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2,
messageClass3); // can return null
// lock.unlockRead(stamp);
@ -140,7 +131,7 @@ class PublisherExactWithSuperTypes implements Publisher {
else {
// Dead Event must EXACTLY MATCH (no subclasses)
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -162,7 +153,7 @@ class PublisherExactWithSuperTypes implements Publisher {
@Override
public
void publish(final Object[] messages) {
publish((Object) messages);
void publish(final Synchrony synchrony, final Object[] messages) {
publish(synchrony, (Object) messages);
}
}

View File

@ -18,8 +18,9 @@ package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.synchrony.Synchrony;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.lang.reflect.Array;
@ -30,45 +31,41 @@ public
class PublisherExactWithSuperTypesAndVarity implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final SubscriptionManager subManager;
private final AtomicBoolean varArgPossibility;
final VarArgUtils varArgUtils;
public
PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.subManager = subManager;
varArgPossibility = subscriber.getVarArgPossibility();
varArgUtils = subscriber.getVarArgUtils();
varArgPossibility = subManager.getVarArgPossibility();
varArgUtils = subManager.getVarArgUtils();
}
@Override
public
void publish(final Object message1) {
void publish(final Synchrony synchrony, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final boolean isArray = messageClass.isArray();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
synchrony.publish(subscriptions, message1);
}
// publish to var arg, only if not already an array (because that would be unnecessary)
if (varArgPossibility.get() && !isArray) {
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subManager); // CAN NOT RETURN NULL
Subscription sub;
int length = varArgSubs.length;
@ -80,6 +77,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message1;
synchrony.publish(varArgSubs, asArray);
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
@ -88,8 +88,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass,
subscriber); // CAN NOT RETURN NULL
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass, subManager); // CAN NOT RETURN NULL
length = varArgSuperSubs.length;
@ -111,7 +110,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
if (!hasSubs) {
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class);
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -132,14 +131,14 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
@Override
public
void publish(final Object message1, final Object message2) {
void publish(final Synchrony synchrony, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass1, messageClass2); // can return null
// lock.unlockRead(stamp);
boolean hasSubs = false;
@ -160,7 +159,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
// stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subManager); // can NOT return null
// lock.unlockRead(stamp);
final int length = varArgSubs.length;
@ -181,8 +180,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
// stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2,
subscriber); // CAN NOT RETURN NULL
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, subManager); // CAN NOT RETURN NULL
// lock.unlockRead(stamp);
@ -210,7 +208,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
// lock.unlockRead(stamp);
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -232,7 +230,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
@ -240,7 +238,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
final Subscription[] subs = subManager.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
// lock.unlockRead(stamp);
@ -263,7 +261,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
// stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subManager); // can NOT return null
// lock.unlockRead(stamp);
final int length = varArgSubs.length;
@ -287,7 +285,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
// stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3,
subscriber); // CAN NOT RETURN NULL
subManager); // CAN NOT RETURN NULL
// lock.unlockRead(stamp);
@ -316,7 +314,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
@ -338,7 +336,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
@Override
public
void publish(final Object[] messages) {
publish((Object) messages);
void publish(final Synchrony synchrony, final Object[] messages) {
publish(synchrony, (Object) messages);
}
}

View File

@ -1,6 +1,6 @@
package com.lmax.disruptor;
package dorkbox.util.messagebus.publication.disruptor;
import dorkbox.util.messagebus.MessageHolder;
import com.lmax.disruptor.EventFactory;
/**
* @author dorkbox, llc

View File

@ -1,7 +1,8 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.publication.disruptor;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import dorkbox.util.messagebus.synchrony.Synchrony;
import dorkbox.util.messagebus.publication.Publisher;
import java.util.concurrent.atomic.AtomicBoolean;
@ -13,12 +14,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final Publisher publisher;
private final Synchrony syncPublication;
AtomicBoolean shutdown = new AtomicBoolean(false);
public
MessageHandler(Publisher publisher) {
MessageHandler(final Publisher publisher, final Synchrony syncPublication) {
this.publisher = publisher;
this.syncPublication = syncPublication;
}
@Override
@ -27,28 +31,25 @@ class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
final int messageType = event.type;
switch (messageType) {
case MessageType.ONE: {
Object message1 = event.message1;
// System.err.println("(" + sequence + ")" + message1);
// this.workProcessor.release(sequence);
this.publisher.publish(message1);
this.publisher.publish(syncPublication, event.message1);
return;
}
case MessageType.TWO: {
Object message1 = event.message1;
Object message2 = event.message2;
this.publisher.publish(message1, message2);
this.publisher.publish(syncPublication, message1, message2);
return;
}
case MessageType.THREE: {
Object message1 = event.message1;
Object message2 = event.message2;
Object message3 = event.message3;
this.publisher.publish(message1, message2, message3);
this.publisher.publish(syncPublication, message3, message1, message2);
return;
}
case MessageType.ARRAY: {
Object[] messages = event.messages;
this.publisher.publish(messages);
this.publisher.publish(syncPublication, messages);
return;
}
}

View File

@ -1,4 +1,6 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.publication.disruptor;
import dorkbox.util.messagebus.subscription.Subscription;
/**
* @author dorkbox, llc Date: 2/2/15
@ -6,6 +8,7 @@ package dorkbox.util.messagebus;
public
class MessageHolder {
public int type = MessageType.ONE;
public Subscription[] subscriptions;
public Object message1 = null;
public Object message2 = null;

View File

@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.publication.disruptor;
final class MessageType {
public final class MessageType {
public static final int ONE = 1;
public static final int TWO = 2;
public static final int THREE = 3;

View File

@ -1,5 +1,6 @@
package com.lmax.disruptor;
package dorkbox.util.messagebus.publication.disruptor;
import com.lmax.disruptor.ExceptionHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;

View File

@ -1,325 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
@SuppressWarnings("Duplicates")
public
class Subscriber {
public static final float LOAD_FACTOR = 0.8F;
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
final ConcurrentMap<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
ThreadLocal<ArrayList<Subscription>> listCache = new ThreadLocal<ArrayList<Subscription>>() {
@Override
protected
ArrayList<Subscription> initialValue() {
return new ArrayList<Subscription>(8);
}
};
public
Subscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR);
}
public
AtomicBoolean getVarArgPossibility() {
return varArgPossibility;
}
public
VarArgUtils getVarArgUtils() {
return varArgUtils;
}
public
void clear() {
this.subUtils.clear();
this.varArgUtils.clear();
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private
void registerMulti(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final AtomicBoolean varArgPossibility) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
final Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
// TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method?
errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass);
return;
}
case 1: {
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final ArrayList<Subscription> cachedSubs = listCache.get();
ArrayList<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs);
if (subs == null) {
listCache.set(new ArrayList<Subscription>(8));
subs = cachedSubs;
// is this handler able to accept var args?
if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(subscription);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(subscription);
}
}
}
public
void shutdown() {
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
clear();
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle.get(messageClass);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
// convert to Array because the subscriptions can change and we want safe iteration over the list
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass) {
ArrayList<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -183,7 +183,7 @@ class Subscription {
}
public
void publishToSubscription(final Object... messages) throws Throwable {
void publish(final Object... messages) throws Throwable {
final MethodAccess handler = this.handler.getHandler();
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;

View File

@ -15,14 +15,23 @@
*/
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
@ -34,6 +43,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public final
class SubscriptionManager {
public static final float LOAD_FACTOR = 0.8F;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
@ -44,27 +56,67 @@ class SubscriptionManager {
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final Subscriber subscriber;
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
final ConcurrentMap<Class<?>, List<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
ThreadLocal<List<Subscription>> listCache = new ThreadLocal<List<Subscription>>() {
@Override
protected
List<Subscription> initialValue() {
return new CopyOnWriteArrayList<Subscription>();
}
};
private final ClassUtils classUtils;
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
// correct type too)
public
SubscriptionManager(final int numberOfThreads, final Subscriber subscriber) {
this.subscriber = subscriber;
SubscriptionManager(final int numberOfThreads, final ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
// modified ONLY during SUB/UNSUB
this.nonListeners = JavaVersionAdapter.concurrentMap(4, Subscriber.LOAD_FACTOR, numberOfThreads);
this.nonListeners = JavaVersionAdapter.concurrentMap(4, LOAD_FACTOR, numberOfThreads);
// only used during SUB/UNSUB, in a rw lock
this.subscriptionsPerListener = JavaVersionAdapter.concurrentMap(32, Subscriber.LOAD_FACTOR, 1);
// only used during SUB/UNSUB
this.subscriptionsPerListener = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.subUtils = new SubscriptionUtils(classUtils, numberOfThreads, LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, numberOfThreads, LOAD_FACTOR);
}
public
void shutdown() {
this.nonListeners.clear();
subscriber.shutdown();
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear();
this.classUtils.clear();
clear();
}
public
@ -73,6 +125,35 @@ class SubscriptionManager {
return;
}
// when subscribing, this is a GREAT opportunity to figure out the classes/objects loaded -- their hierarchy, AND generate UUIDs
// for each CLASS that can be accessed. This then lets us lookup a UUID for each object that comes in -- if an ID is found (for
// any part of it's object hierarchy) -- it means that we have that listeners for that object. this is MUCH faster checking if
// we have subscriptions first (and failing).
//
// so during subscribe we can check "getUUID for all parameter.class accessed by this listener" -> then during publish "lookup
// UUID of incoming message.class" (+ it's super classes, if necessary) -> then check if UUID exists. If yes, then we know there
// are subs. if no - then it's a dead message.
//
// This lets us accomplish TWO things
// 1) be able quickly determine if there are dead messages
// 2) be able to create "multi-class" UUIDs, when two+ classes are represented (always) by the same UUID, by a clever mixing of
// the classes individual UUIDs.
//
// The generation of UUIDs happens ONLY during subscribe, and during publish they are looked up. This UUID can be a simple
// AtomicInteger that starts a MIN_VALUE and count's up.
// note: we can do PRE-STARTUP instrumentation (ie, BEFORE any classes are loaded by the classloader) and inject the UUID into
// every object (as a public static final field), then use reflection to look up this value. It would go something like this:
// 1) scan every class for annotations that match
// 2) for each method that has our annotation -- get the list of classes + hierarchy that are the parameters for the method
// 3) inject the UUID field into each class object that #2 returns, only if it doesn't already exist. use invalid field names
// (ie: start with numbers or ? or ^ or something
//
// then during SUB/UNSUB/PUB, we use this UUID for everything (and we can have multi-UUID lookups for the 'multi-arg' thing).
// If there is no UUID, then we just abort the SUB/UNSUB or send a deadmessage
final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
@ -81,7 +162,7 @@ class SubscriptionManager {
}
// these are concurrent collections
subscriber.clear();
clear();
// this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist.
// ONCE subscriptions are in THIS map, they are considered AVAILABLE.
@ -103,7 +184,7 @@ class SubscriptionManager {
final AtomicBoolean varArgPossibility = subscriber.varArgPossibility;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription;
MessageHandler messageHandler;
@ -111,7 +192,7 @@ class SubscriptionManager {
Class<?> handlerType;
// create the subscriptions
final ConcurrentMap<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = subscriber.subscriptionsPerMessageSingle;
final ConcurrentMap<Class<?>, List<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
subscriptions = new Subscription[handlersSize];
for (int i = 0; i < handlersSize; i++) {
@ -129,10 +210,10 @@ class SubscriptionManager {
handlerType = messageHandlerTypes[0];
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final ArrayList<Subscription> cachedSubs = subscriber.listCache.get();
ArrayList<Subscription> subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs);
final List<Subscription> cachedSubs = listCache.get();
List<Subscription> subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs);
if (subs == null) {
subscriber.listCache.set(new ArrayList<Subscription>(8));
listCache.set(new CopyOnWriteArrayList<Subscription>());
}
// create the subscription. This can be thrown away if the subscription succeeds in another thread
@ -144,11 +225,10 @@ class SubscriptionManager {
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
// putIfAbsent
final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub
if (previousSubs != null) {
// another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure
// all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUT the correct list again.
// all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUCT the correct list again.
// This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle.
// since nothing is yet "subscribed" we can assign the correct values for everything now
@ -195,7 +275,7 @@ class SubscriptionManager {
}
// these are concurrent collections
subscriber.clear();
clear();
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
@ -207,4 +287,248 @@ class SubscriptionManager {
}
}
}
public
AtomicBoolean getVarArgPossibility() {
return varArgPossibility;
}
public
VarArgUtils getVarArgUtils() {
return varArgUtils;
}
public
void clear() {
this.subUtils.clear();
this.varArgUtils.clear();
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private
void registerMulti(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, List<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final AtomicBoolean varArgPossibility) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
final Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
// TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method?
errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass);
return;
}
case 1: {
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final List<Subscription> cachedSubs = listCache.get();
List<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs);
if (subs == null) {
listCache.set(new CopyOnWriteArrayList<Subscription>());
// listCache.set(new ArrayList<Subscription>(8));
subs = cachedSubs;
// is this handler able to accept var args?
if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(subscription);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(subscription);
}
}
}
public
List<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle.get(messageClass);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass) {
final List<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
// convert to Array because the subscriptions can change and we want safe iteration over the list
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass) {
List<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -0,0 +1,150 @@
package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
/**
*
*/
public
class AsyncABQ implements Synchrony {
private final ErrorHandlingSupport errorHandler;
private final ArrayBlockingQueue<Object> dispatchQueue;
private final Collection<Thread> threads;
/**
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown = false;
public
AsyncABQ(final int numberOfThreads,
final ErrorHandlingSupport errorHandler,
final Publisher publisher,
final Synchrony syncPublication) {
this.errorHandler = errorHandler;
this.dispatchQueue = new ArrayBlockingQueue<Object>(1024);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
final ArrayBlockingQueue<?> IN_QUEUE = AsyncABQ.this.dispatchQueue;
final Publisher publisher1 = publisher;
final Synchrony syncPublication1 = syncPublication;
while (!AsyncABQ.this.shuttingDown) {
try {
//noinspection InfiniteLoopStatement
while (true) {
final Object take = IN_QUEUE.take();
publisher1.publish(syncPublication1, take);
}
} catch (InterruptedException e) {
if (!AsyncABQ.this.shuttingDown) {
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// break;
// }
// case 2: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node)));
// break;
// }
// case 3: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node),
// MultiNode.lpItem3(node)));
// break;
// }
// default: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// }
// }
}
}
}
}
};
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
}
}
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
this.dispatchQueue.put(message1);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable {
}
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
for (Thread t : this.threads) {
t.start();
}
}
public
void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
}
}
public
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
}
}

View File

@ -0,0 +1,180 @@
package dorkbox.util.messagebus.synchrony;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.publication.disruptor.EventBusFactory;
import dorkbox.util.messagebus.publication.disruptor.MessageHandler;
import dorkbox.util.messagebus.publication.disruptor.MessageHolder;
import dorkbox.util.messagebus.publication.disruptor.MessageType;
import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public
class AsyncDisruptor implements Synchrony {
private final ErrorHandlingSupport errorHandler;
private WorkProcessor[] workProcessors;
private MessageHandler[] handlers;
private RingBuffer<MessageHolder> ringBuffer;
private Sequence workSequence;
public
AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Publisher publisher, final Synchrony syncPublication) {
this.errorHandler = errorHandler;
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
new NamedThreadFactory("MessageBus"));
final PublicationExceptionHandler<MessageHolder> exceptionHandler = new PublicationExceptionHandler<MessageHolder>(errorHandler);
EventBusFactory factory = new EventBusFactory();
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(publisher, syncPublication); // exactly one per thread is used
}
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
final int BUFFER_SIZE = 32;
// final int BUFFER_SIZE = 16;
// final int BUFFER_SIZE = 8;
// final int BUFFER_SIZE = 4;
WaitStrategy consumerWaitStrategy;
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
final int numWorkers = handlers.length;
workProcessors = new WorkProcessor[numWorkers];
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer,
sequenceBarrier,
handlers[i],
exceptionHandler, workSequence);
}
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence()
.set(cursor);
executor.execute(processor);
}
}
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE;
job.subscriptions = subscriptions;
job.message1 = message1;
ringBuffer.publish(seq);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable {
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[workProcessors.length + 1];
for (int i = 0, size = workProcessors.length; i < size; i++) {
sequences[i] = workProcessors[i].getSequence();
}
sequences[sequences.length - 1] = workSequence; // always add the work sequence
return sequences;
}
public
void start() {
}
public
void shutdown() {
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
}
public
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,148 @@
package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.LinkedTransferQueue;
/**
*
*/
public
class AsyncLTQ implements Synchrony {
private final ErrorHandlingSupport errorHandler;
private final LinkedTransferQueue<Object> dispatchQueue;
private final Collection<Thread> threads;
/**
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown = false;
public
AsyncLTQ(final int numberOfThreads,
final ErrorHandlingSupport errorHandler,
final Publisher publisher,
final Synchrony syncPublication) {
this.errorHandler = errorHandler;
this.dispatchQueue = new LinkedTransferQueue<Object>();
this.threads = new ArrayDeque<Thread>(numberOfThreads);
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
LinkedTransferQueue<?> IN_QUEUE = AsyncLTQ.this.dispatchQueue;
final Publisher publisher1 = publisher;
final Synchrony syncPublication1 = syncPublication;
while (!AsyncLTQ.this.shuttingDown) {
try {
//noinspection InfiniteLoopStatement
while (true) {
final Object take = IN_QUEUE.take();
publisher1.publish(syncPublication1, take);
}
} catch (InterruptedException e) {
if (!AsyncLTQ.this.shuttingDown) {
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// break;
// }
// case 2: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node)));
// break;
// }
// case 3: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node),
// MultiNode.lpItem3(node)));
// break;
// }
// default: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// }
// }
}
}
}
}
};
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
}
}
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
this.dispatchQueue.transfer(message1);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable {
}
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
for (Thread t : this.threads) {
t.start();
}
}
public
void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
}
}
public
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
}
}

View File

@ -0,0 +1,184 @@
package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.LinkedTransferQueue;
/**
*
*/
public
class AsyncMisc implements Synchrony {
/**
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown = false;
// private final LinkedBlockingQueue<Object> dispatchQueue;
// private final ArrayBlockingQueue<Object> dispatchQueue;
private final LinkedTransferQueue<Object> dispatchQueue;
private final Collection<Thread> threads;
public
AsyncMisc(final int numberOfThreads,
final ErrorHandlingSupport errorHandler,
final Publisher publisher,
final Synchrony syncPublication) {
// this.dispatchQueue = new LinkedBlockingQueue<Object>(1024);
// this.dispatchQueue = new ArrayBlockingQueue<Object>(1024);
this.dispatchQueue = new LinkedTransferQueue<Object>();
this.threads = new ArrayDeque<Thread>(numberOfThreads);
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
// LinkedBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
// ArrayBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
LinkedTransferQueue<?> IN_QUEUE = AsyncMisc.this.dispatchQueue;
// MultiNode node = new MultiNode();
while (!AsyncMisc.this.shuttingDown) {
try {
//noinspection InfiniteLoopStatement
while (true) {
// IN_QUEUE.take(node);
final Object take = IN_QUEUE.take();
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// publish(take);
// break;
// }
// case 2: {
// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node));
// break;
// }
// case 3: {
// publish(MultiNode.lpItem1(node), MultiNode.lpItem2(node), MultiNode.lpItem3(node));
// break;
// }
// default: {
// publish(MultiNode.lpItem1(node));
// }
// }
}
} catch (InterruptedException e) {
if (!AsyncMisc.this.shuttingDown) {
// Integer type = (Integer) MultiNode.lpMessageType(node);
// switch (type) {
// case 1: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// break;
// }
// case 2: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node)));
// break;
// }
// case 3: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node),
// MultiNode.lpItem2(node),
// MultiNode.lpItem3(node)));
// break;
// }
// default: {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Thread interrupted while processing message")
// .setCause(e)
// .setPublishedObject(MultiNode.lpItem1(node)));
// }
// }
}
}
}
}
};
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
}
}
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
// try {
// this.dispatchQueue.transfer(message);
//// this.dispatchQueue.put(message);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
// }
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable {
}
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
for (Thread t : this.threads) {
t.start();
}
}
public
void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
}
}
public
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
}
}

View File

@ -0,0 +1,51 @@
package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.subscription.Subscription;
public
class Sync implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
}
@Override
public
void publish(final Subscription[] subscriptions, final Object[] messages) throws Throwable {
}
@Override
public
void start() {
}
@Override
public
void shutdown() {
}
@Override
public
boolean hasPendingMessages() {
return false;
}
}

View File

@ -0,0 +1,18 @@
package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.subscription.Subscription;
/**
*
*/
public
interface Synchrony {
void publish(final Subscription[] subscriptions, Object message1) throws Throwable;
void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ;
void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ;
void publish(final Subscription[] subscriptions, Object[] messages) throws Throwable ;
void start();
void shutdown();
boolean hasPendingMessages();
}

View File

@ -17,10 +17,11 @@ package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public final
@ -35,13 +36,13 @@ class SubscriptionUtils {
public
SubscriptionUtils(final ClassUtils superClass, final float loadFactor) {
SubscriptionUtils(final ClassUtils superClass, final int numberOfThreads, final float loadFactor) {
this.superClass = superClass;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, 1);
this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, numberOfThreads);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
}
@ -62,7 +63,7 @@ class SubscriptionUtils {
* @return CAN NOT RETURN NULL
*/
public
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, final Subscriber subscriber) {
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
@ -75,7 +76,7 @@ class SubscriptionUtils {
final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
Class<?> superClass;
ArrayList<Subscription> superSubs;
List<Subscription> superSubs;
Subscription sub;
final int length = superClasses.length;
@ -84,7 +85,7 @@ class SubscriptionUtils {
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
superSubs = subscriber.getExactAsArray(superClass);
superSubs = subManager.getExactAsArray(superClass);
if (superSubs != null) {
superSubLength = superSubs.size();
@ -115,7 +116,7 @@ class SubscriptionUtils {
* @return CAN NOT RETURN NULL
*/
public
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Subscriber subscriber) {
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> cached = this.superClassSubscriptionsMulti;
@ -154,7 +155,7 @@ class SubscriptionUtils {
continue;
}
superSubs = subscriber.getExactAsArray(superClass1, superClass2);
superSubs = subManager.getExactAsArray(superClass1, superClass2);
if (superSubs != null) {
for (int k = 0; k < superSubs.size(); k++) {
sub = superSubs.get(k);
@ -184,7 +185,7 @@ class SubscriptionUtils {
*/
public
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Class<?> clazz3,
final Subscriber subscriber) {
final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
@ -234,7 +235,7 @@ class SubscriptionUtils {
continue;
}
superSubs = subscriber.getExactAsArray(superClass1, superClass2, superClass3);
superSubs = subManager.getExactAsArray(superClass1, superClass2, superClass3);
if (superSubs != null) {
for (int m = 0; m < superSubs.size(); m++) {
sub = superSubs.get(m);

View File

@ -18,10 +18,11 @@ package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public final
@ -36,14 +37,14 @@ class VarArgUtils {
public
VarArgUtils(final ClassUtils superClassUtils, final float loadFactor) {
VarArgUtils(final ClassUtils superClassUtils, final int numberOfThreads, final float loadFactor) {
this.superClassUtils = superClassUtils;
this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
}
@ -62,7 +63,7 @@ class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public
Subscription[] getVarArgSubscriptions(final Class<?> messageClass, final Subscriber subscriber) {
Subscription[] getVarArgSubscriptions(final Class<?> messageClass, final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptionsSingle;
@ -72,7 +73,7 @@ class VarArgUtils {
// this gets (and caches) our array type. This is never cleared.
final Class<?> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
final ArrayList<Subscription> subs = subscriber.getExactAsArray(arrayVersion);
final List<Subscription> subs = subManager.getExactAsArray(arrayVersion);
if (subs != null) {
final int length = subs.size();
varArgSubs = new ArrayList<Subscription>(length);
@ -104,8 +105,8 @@ class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public
Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass, final Subscriber subscriber) {
final ArrayList<Subscription> subs = getVarArgSuperSubscriptions_List(messageClass, subscriber);
Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass, final SubscriptionManager subManager) {
final ArrayList<Subscription> subs = getVarArgSuperSubscriptions_List(messageClass, subManager);
final Subscription[] returnedSubscriptions = new Subscription[subs.size()];
subs.toArray(returnedSubscriptions);
@ -114,7 +115,7 @@ class VarArgUtils {
// CAN NOT RETURN NULL
private
ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass, final Subscriber subscriber) {
ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass, final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsSingle;
@ -136,13 +137,13 @@ class VarArgUtils {
Class<?> type;
Subscription sub;
ArrayList<Subscription> subs;
List<Subscription> subs;
int length;
MessageHandler handlerMetadata;
for (int i = 0; i < typesLength; i++) {
type = types[i];
subs = subscriber.getExactAsArray(type);
subs = subManager.getExactAsArray(type);
if (subs != null) {
length = subs.size();
@ -173,7 +174,7 @@ class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public
Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Subscriber subscriber) {
Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
@ -182,8 +183,8 @@ class VarArgUtils {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subManager);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subManager);
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
@ -202,7 +203,7 @@ class VarArgUtils {
// and then, returns the array'd version superclass subscriptions
public
Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3,
final Subscriber subscriber) {
final SubscriptionManager subManager) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
@ -211,9 +212,9 @@ class VarArgUtils {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subManager);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subManager);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subManager);
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3);

View File

@ -22,15 +22,29 @@
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.SubscriptionValidator;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.listeners.*;
import dorkbox.util.messagebus.messages.*;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.listeners.AbstractMessageListener;
import dorkbox.util.messagebus.listeners.ICountableListener;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.IMultipartMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.MultipartMessageListener;
import dorkbox.util.messagebus.listeners.Overloading;
import dorkbox.util.messagebus.listeners.StandardMessageListener;
import dorkbox.util.messagebus.messages.AbstractMessage;
import dorkbox.util.messagebus.messages.ICountable;
import dorkbox.util.messagebus.messages.IMessage;
import dorkbox.util.messagebus.messages.IMultipartMessage;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
import org.junit.Test;
/**
@ -157,11 +171,8 @@ public class SubscriptionManagerTest extends AssertSupport {
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles(
@ -181,21 +192,17 @@ public class SubscriptionManagerTest extends AssertSupport {
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
validator.validate(subscriber);
validator.validate(subscriptionManager);
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1);
listeners.clear();
validator.validate(subscriber);
validator.validate(subscriptionManager);
}
}

View File

@ -22,8 +22,8 @@
*/
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayDeque;
import java.util.Arrays;
@ -60,13 +60,13 @@ public class SubscriptionValidator extends AssertSupport {
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(Subscriber subscriber) {
public void validate(SubscriptionManager subManager) {
for (Class messageType : this.messageTypes) {
Collection<ValidationEntry> validationEntries = getEntries(messageType);
// we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Subscription[] subscriptions = subscriber.getExactAndSuper(messageType);
Subscription[] subscriptions = subManager.getExactAndSuper(messageType);
if (subscriptions != null) {
collection.addAll(Arrays.asList(subscriptions));
}