java 8 stamped lock is faster than reentrantlock. WIP caches

This commit is contained in:
nathan 2015-06-02 23:00:26 +02:00
parent ab32656ce1
commit e70adb2486
22 changed files with 4664 additions and 4714 deletions

View File

@ -14,7 +14,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* messages getSubscriptions published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* in a synchronized fashion using @Synchronized annotation
*
* <p/>
@ -25,7 +25,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* <p/>
* By default, the bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched. This can be changed using the @Listener annotation.
* removed on-the-fly as messages getSubscriptions dispatched. This can be changed using the @Listener annotation.
*
* <p/>
* Generally message handlers will be invoked in inverse sequence of subscription but any
@ -50,7 +50,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
*
* <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
* getSubscriptions dispatched to all message handlers that take an instance of List as their parameter
*
* @Author bennidi
* Date: 2/8/12

View File

@ -1,31 +1,29 @@
package dorkbox.util.messagebus;
import java.util.ArrayDeque;
import java.util.Collection;
import org.jctools.util.Pow2;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import org.jctools.util.Pow2;
import java.util.ArrayDeque;
import java.util.Collection;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
*
* @Author bennidi
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MultiMBassador implements IMessageBus {
public static final String ERROR_HANDLER_MSG =
"INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.addErrorHandler()\n" +
"Falling back to console logger.";
public static final String ERROR_HANDLER_MSG = "INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.addErrorHandler()\n" +
"Falling back to console logger.";
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
@ -36,12 +34,7 @@ public class MultiMBassador implements IMessageBus {
private final Collection<Thread> threads;
/**
* if true, only exact matching will be performed on classes. Setting this to true
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
* system. By default, this is FALSE, to support subTypes and VarArg matching.
*/
private final boolean forceExactMatches;
private final Matcher subscriptionMatcher;
/**
* Notifies the consumers during shutdown, that it's on purpose.
@ -52,7 +45,7 @@ public class MultiMBassador implements IMessageBus {
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
*/
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()/2);
this(Runtime.getRuntime().availableProcessors() / 2);
}
/**
@ -64,21 +57,37 @@ public class MultiMBassador implements IMessageBus {
/**
* @param forceExactMatches if TRUE, only exact matching will be performed on classes. Setting this to true
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
* system. By default, this is FALSE, to support subTypes and VarArg matching.
*
* @param numberOfThreads how many threads to have for dispatching async messages
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
* system. By default, this is FALSE, to support subTypes and VarArg matching.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(boolean forceExactMatches, int numberOfThreads) {
if (numberOfThreads < 2) {
numberOfThreads = 2; // at LEAST 2 threads
}
numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads);
this.forceExactMatches = forceExactMatches;
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
if (forceExactMatches) {
subscriptionMatcher = new Matcher() {
@Override
public Subscription[] getSubscriptions(Class<?> messageClass) {
return subscriptionManager.getSubscriptionsForcedExact(messageClass);
}
};
}
else {
subscriptionMatcher = new Matcher() {
@Override
public Subscription[] getSubscriptions(Class<?> messageClass) {
return subscriptionManager.getSubscriptions(messageClass);
}
};
}
this.threads = new ArrayDeque<Thread>(numberOfThreads);
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
@ -95,33 +104,37 @@ public class MultiMBassador implements IMessageBus {
while (true) {
IN_QUEUE.take(node);
switch (node.messageType) {
case 1: publish(node.item1); continue;
case 2: publish(node.item1, node.item2); continue;
case 3: publish(node.item1, node.item2, node.item3); continue;
case 1:
publish(node.item1);
continue;
case 2:
publish(node.item1, node.item2);
continue;
case 3:
publish(node.item1, node.item2, node.item3);
continue;
}
}
} catch (InterruptedException e) {
if (!MultiMBassador.this.shuttingDown) {
switch (node.messageType) {
case 1: {
handlePublicationError(new PublicationError()
.setMessage("Thread interupted while processing message")
.setCause(e)
.setPublishedObject(node.item1));
handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message")
.setCause(e).setPublishedObject(node.item1));
continue;
}
case 2: {
handlePublicationError(new PublicationError()
.setMessage("Thread interupted while processing message")
.setCause(e)
.setPublishedObject(node.item1, node.item2));
handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message")
.setCause(e).setPublishedObject(node.item1, node.item2));
continue;
}
case 3: {
handlePublicationError(new PublicationError()
.setMessage("Thread interupted while processing message")
.setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3));
handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message")
.setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3));
continue;
}
}
@ -189,37 +202,35 @@ public class MultiMBassador implements IMessageBus {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public void publish(final Object message) {
try {
boolean subsPublished = false;
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
final SubscriptionManager manager = this.subscriptionManager;
final Class<?> messageClass = message.getClass();
Subscription[] subscriptions;
Subscription sub;
if (this.forceExactMatches) {
subscriptions = manager.getSubscriptionsForcedExact(messageClass);
} else {
subscriptions = manager.getSubscriptions(messageClass);
}
subscriptions = subscriptionMatcher.getSubscriptions(messageClass);
int c = 0;
// Run subscriptions
int length = subscriptions.length;
if (length > 0) {
for (int i=0;i<length;i++) {
sub = subscriptions[i];
sub.publish(message);
}
if (subscriptions != null) {
subsPublished = true;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
c = sub.c();
}
}
// if (!this.forceExactMatches) {
// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null
// // now get superClasses
// // now getSubscriptions superClasses
// int length = superSubscriptions.length;
//
// if (length > 0) {
@ -236,7 +247,7 @@ public class MultiMBassador implements IMessageBus {
// if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
// for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
//
//
// // this catches all exception types
// sub.publishToSubscription(this, message);
// }
@ -247,21 +258,21 @@ public class MultiMBassador implements IMessageBus {
// // publish to var arg, only if not already an array
// if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
// Object[] asArray = null;
//
//
// ConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
// if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
// asArray[0] = message;
//
//
// for (iterator = varargSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, asArray);
// }
// }
//
//
// ConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// // now get array based superClasses (but only if those ALSO accept vararg)
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
@ -276,14 +287,13 @@ public class MultiMBassador implements IMessageBus {
// }
// }
if (!subsPublished) {
if (c == 0 && !subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Subscription[] deadSubscriptions = manager.getSubscriptionsForcedExact(DeadMessage.class);
length = deadSubscriptions.length;
if (length > 0) {
if (deadSubscriptions != null) {
DeadMessage deadMessage = new DeadMessage(message);
for (int i=0;i<length;i++) {
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
@ -291,10 +301,8 @@ public class MultiMBassador implements IMessageBus {
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message));
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message));
}
}
@ -306,7 +314,7 @@ public class MultiMBassador implements IMessageBus {
// Class<?> messageClass2 = message2.getClass();
//
// StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
// BooleanHolder subsPublished = this.booleanThreadLocal.get();
// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions();
// subsPublished.bool = false;
//
// ISetEntry<Subscription> current;
@ -326,7 +334,7 @@ public class MultiMBassador implements IMessageBus {
//
// if (!this.forceExactMatches) {
// StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// // now get superClasses
// // now getSubscriptions superClasses
// if (superSubscriptions != null) {
// current = superSubscriptions.head;
// while (current != null) {
@ -360,7 +368,7 @@ public class MultiMBassador implements IMessageBus {
// }
//
// StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// // now get array based superClasses (but only if those ALSO accept vararg)
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass1, 2);
@ -380,7 +388,7 @@ public class MultiMBassador implements IMessageBus {
// } else {
// StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2);
//
// // now get array based superClasses (but only if those ALSO accept vararg)
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
// current = varargSuperMultiSubscriptions.head;
// while (current != null) {
@ -430,7 +438,7 @@ public class MultiMBassador implements IMessageBus {
// Class<?> messageClass3 = message3.getClass();
//
// StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
// BooleanHolder subsPublished = this.booleanThreadLocal.get();
// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions();
// subsPublished.bool = false;
//
// ISetEntry<Subscription> current;
@ -451,7 +459,7 @@ public class MultiMBassador implements IMessageBus {
//
// if (!this.forceExactMatches) {
// StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// // now get superClasses
// // now getSubscriptions superClasses
// if (superSubscriptions != null) {
// current = superSubscriptions.head;
// while (current != null) {
@ -485,7 +493,7 @@ public class MultiMBassador implements IMessageBus {
// }
//
// StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// // now get array based superClasses (but only if those ALSO accept vararg)
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass1, 3);
@ -506,7 +514,7 @@ public class MultiMBassador implements IMessageBus {
// } else {
// StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
//
// // now get array based superClasses (but only if those ALSO accept vararg)
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
// current = varargSuperMultiSubscriptions.head;
// while (current != null) {
@ -554,12 +562,11 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(message);
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message));
}
} else {
}
else {
throw new NullPointerException("Message cannot be null.");
}
}
@ -570,12 +577,11 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(message1, message2);
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2));
}
} else {
}
else {
throw new NullPointerException("Messages cannot be null.");
}
}
@ -586,12 +592,11 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(message1, message2, message3);
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2, message3));
}
} else {
}
else {
throw new NullPointerException("Messages cannot be null.");
}
}

View File

@ -4,8 +4,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import dorkbox.util.messagebus.common.thread.StampedLock;
import java.util.concurrent.locks.StampedLock;
abstract class pad<T> extends item<T> {
@ -21,15 +20,15 @@ abstract class pad<T> extends item<T> {
* @author bennidi
* Date: 2/12/12
*/
public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T> {
public abstract class AbstractConcurrentSet<T> implements Set<T> {
private static final AtomicLong id = new AtomicLong();
private final transient long ID = id.getAndIncrement();
// Internal state
protected final StampedLock lock = new StampedLock();
private final transient Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
volatile long y0, y1, y2, y4, y5, y6 = 7L;
public volatile Entry<T> head; // reference to the first element
volatile long z0, z1, z2, z4, z5, z6 = 7L;
@ -44,7 +43,7 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
if (element == null) {
return false;
}
boolean changed = false;
boolean changed;
long stamp = this.lock.readLock();
if (this.entries.containsKey(element)) {
@ -67,15 +66,11 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
@Override
public boolean contains(Object element) {
long stamp = this.lock.tryOptimisticRead();
long stamp = this.lock.readLock();
ISetEntry<T> entry = this.entries.get(element);
if (!this.lock.validate(stamp)) {
stamp = this.lock.readLock();
entry = this.entries.get(element);
this.lock.unlockRead(stamp);
}
this.lock.unlockRead(stamp);
return entry != null && entry.getValue() != null;
}
@ -125,15 +120,11 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
@Override
public boolean remove(Object element) {
StampedLock lock = this.lock;
long stamp = lock.tryOptimisticRead();
long stamp = lock.readLock();
ISetEntry<T> entry = this.entries.get(element);
if (!lock.validate(stamp)) {
stamp = lock.readLock();
entry = this.entries.get(element);
lock.unlockRead(stamp);
}
lock.unlockRead(stamp);
if (entry == null || entry.getValue() == null) {
return false; // fast exit

View File

@ -1,15 +1,15 @@
package dorkbox.util.messagebus.common;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
/**
* Simple tree structure that is a map that contains a chain of keys to get to a value.
* Simple tree structure that is a map that contains a chain of keys to getSubscriptions to a value.
* <p>
* THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes
*
@ -506,7 +506,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> objectTree = null;
// get value from our children
// getSubscriptions value from our children
objectTree = getLeaf_NL(key); // protected by lock
if (objectTree == null) {
@ -525,7 +525,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(key1); // protected by lock
if (tree != null) {
tree = tree.getLeaf_NL(key2); // protected by lock
@ -547,7 +547,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
@ -573,7 +573,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(keys[0]);
int size = keys.length;
@ -625,7 +625,7 @@ public class HashMapTree<KEY, VALUE> {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
@ -642,7 +642,7 @@ public class HashMapTree<KEY, VALUE> {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
@ -668,7 +668,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
// getSubscriptions value from our children
tree = getLeaf_NL(keys[0]);
for (int i=1;i<size;i++) {
@ -696,4 +696,4 @@ public class HashMapTree<KEY, VALUE> {
return tree;
}
}
}

View File

@ -1,14 +1,13 @@
package dorkbox.util.messagebus.common;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
@ -30,11 +29,11 @@ import dorkbox.util.messagebus.annotations.Synchronized;
*/
public class MessageHandler {
// get all listeners defined by the given class (includes
// getSubscriptions all listeners defined by the given class (includes
// listeners defined in super classes)
public static final MessageHandler[] get(final Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
// getSubscriptions all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
@ -97,32 +96,32 @@ public class MessageHandler {
this.acceptsVarArgs = handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs();
}
public boolean isSynchronized(){
public final boolean isSynchronized() {
return this.isSynchronized;
}
public MethodAccess getHandler() {
public final MethodAccess getHandler() {
return this.handler;
}
public int getMethodIndex() {
public final int getMethodIndex() {
return this.methodIndex;
}
public Class<?>[] getHandledMessages() {
public final Class<?>[] getHandledMessages() {
return this.handledMessages;
}
public boolean acceptsSubtypes() {
public final boolean acceptsSubtypes() {
return this.acceptsSubtypes;
}
public boolean acceptsVarArgs() {
public final boolean acceptsVarArgs() {
return this.acceptsVarArgs;
}
@Override
public int hashCode() {
public final int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.acceptsSubtypes ? 1231 : 1237);
@ -133,7 +132,7 @@ public class MessageHandler {
}
@Override
public boolean equals(Object obj) {
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}

View File

@ -84,4 +84,4 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
return this.value;
}
}
}
}

View File

@ -11,11 +11,11 @@ public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> {
public StrongConcurrentSetV8(int size, float loadFactor) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, 1));
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, 16));
}
public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, stripeSize));
}
}
}

View File

@ -0,0 +1,93 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
public class SuperClassUtils {
private final Map<Class<?>, Class<?>> versionCache;
private final Map<Class<?>, Class<?>[]> superClassesCache;
public SuperClassUtils(float loadFactor, int stripeSize) {
this.versionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, stripeSize);
}
/**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
* <p>
* if parameter clazz is of type array, then the super classes are of array type as well
* <p>
* protected by read lock by caller. The cache version is called first, by write lock
*/
public final Class<?>[] getSuperClasses(final Class<?> clazz, final boolean isArray) {
// this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
if (classes == null) {
// getSubscriptions all super types of class
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
final int length = superTypes.length;
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(length);
Class<?> c;
if (isArray) {
for (int i = 0; i < length; i++) {
c = superTypes[i];
c = getArrayClass(c);
if (c != clazz) {
newList.add(c);
}
}
}
else {
for (int i = 0; i < length; i++) {
c = superTypes[i];
if (c != clazz) {
newList.add(c);
}
}
}
classes = newList.toArray(new Class<?>[newList.size()]);
local.put(clazz, classes);
}
return classes;
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public final Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> versionCache = this.versionCache;
Class<?> clazz = versionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
versionCache.put(c, clazz);
}
return clazz;
}
/**
* Clears the caches on shutdown
*/
public final void shutdown() {
this.versionCache.clear();
this.superClassesCache.clear();
}
}

View File

@ -1,12 +1,13 @@
package dorkbox.util.messagebus.common;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
public class VarArgUtils {
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> varArgSubscriptions;
@ -22,8 +23,8 @@ public class VarArgUtils {
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
float loadFactor, int stripeSize) {
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor,
int stripeSize) {
this.utils = utils;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
@ -53,7 +54,7 @@ public class VarArgUtils {
//
// // whenever our subscriptions change, this map is cleared.
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.getSubscriptions();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
@ -67,7 +68,7 @@ public class VarArgUtils {
// Iterator<Subscription> iterator;
// Subscription sub;
//
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.getSubscriptions(arrayVersion);
// if (subs != null) {
// for (iterator = subs.iterator(); iterator.hasNext();) {
// sub = iterator.next();
@ -95,7 +96,7 @@ public class VarArgUtils {
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
//
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.getSubscriptions();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
@ -122,7 +123,7 @@ public class VarArgUtils {
// for (iterator = types.iterator(); iterator.hasNext();) {
// superClass = iterator.next();
//
// Collection<Subscription> subs = local2.get(superClass);
// Collection<Subscription> subs = local2.getSubscriptions(superClass);
// if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next();
@ -158,7 +159,7 @@ public class VarArgUtils {
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.get();
// subsPerType = subHolderConcurrent.getSubscriptions();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
// if (putIfAbsent != null) {
@ -189,10 +190,11 @@ public class VarArgUtils {
}
// CAN NOT RETURN NULL
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
@ -205,7 +207,7 @@ public class VarArgUtils {
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.get();
// subsPerType = subHolderConcurrent.getSubscriptions();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
// if (putIfAbsent != null) {

View File

@ -3,8 +3,7 @@ package dorkbox.util.messagebus.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
import dorkbox.util.messagebus.common.thread.StampedLock;
import java.util.concurrent.locks.StampedLock;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of
@ -38,6 +37,8 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
StampedLock lock = WeakConcurrentSet.this.lock;
long stamp = lock.writeLock();
// final Lock writeLock = WeakConcurrentSet.this.lock.writeLock();
// writeLock.lock();
try{
do {
ISetEntry<T> orphaned = this.current;
@ -47,6 +48,7 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
}
finally {
lock.unlockWrite(stamp);
// writeLock.unlock();
}
}
@ -118,9 +120,5 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
public T getValue() {
return this.value.get();
}
}
}

View File

@ -0,0 +1,11 @@
package dorkbox.util.messagebus.common;
import java.util.Iterator;
abstract class item3<T> implements Iterator<T> {
public ISetEntry<T> current;
public item3(ISetEntry<T> current) {
this.current = current;
}
}

View File

@ -1,25 +1,12 @@
package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lpItem1;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lpItem2;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lpItem3;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lpThread;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lpType;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lvMessageType;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.lvThread;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.soThread;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spItem1;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spItem2;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spItem3;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spMessageType;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spThread;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.spType;
import java.util.concurrent.ThreadLocalRandom;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.UnsafeAccess;
import java.util.concurrent.ThreadLocalRandom;
import static dorkbox.util.messagebus.common.simpleq.MultiNode.*;
public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
private static final int TYPE_EMPTY = 0;
@ -445,7 +432,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
// final Object node = nodeThreadLocal.get();
// final Object node = nodeThreadLocal.getSubscriptions();
spType(node, TYPE_CONSUMER);
spThread(node, myThread);

View File

@ -1,13 +1,13 @@
package dorkbox.util.messagebus.common.thread;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
@ -39,7 +39,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
return false;
}
// had to modify the super implementation so we get Node<T> back
// had to modify the super implementation so we getSubscriptions Node<T> back
Node<T> alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER);
if (alreadyPresent == null) {
// this doesn't already exist

View File

@ -0,0 +1,5 @@
package dorkbox.util.messagebus.subscription;
public interface Matcher {
Subscription[] getSubscriptions(Class<?> messageClass);
}

View File

@ -1,19 +1,17 @@
package dorkbox.util.messagebus.subscription;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.omg.CORBA.BooleanHolder;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import org.omg.CORBA.BooleanHolder;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered
@ -43,9 +41,10 @@ public class Subscription {
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSetV8<Object>(16, 0.85F, 16);
this.listeners = new StrongConcurrentSetV8<Object>(16, 0.85F, 15);
// this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
// this.listeners = new ConcurrentLinkedQueue2<Object>();
// this.listeners = new CopyOnWriteArrayList<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
@ -59,26 +58,26 @@ public class Subscription {
return this.handlerMetadata.getHandledMessages();
}
public boolean acceptsSubtypes() {
public final boolean acceptsSubtypes() {
return this.handlerMetadata.acceptsSubtypes();
}
public boolean acceptsVarArgs() {
public final boolean acceptsVarArgs() {
return this.handlerMetadata.acceptsVarArgs();
}
public boolean isEmpty() {
public final boolean isEmpty() {
return this.listeners.isEmpty();
}
public void subscribe(Object listener) {
public final void subscribe(Object listener) {
this.listeners.add(listener);
}
/**
* @return TRUE if the element was removed
*/
public boolean unsubscribe(Object existingListener) {
public final boolean unsubscribe(Object existingListener) {
return this.listeners.remove(existingListener);
}
@ -96,9 +95,9 @@ public class Subscription {
* @return true if there were listeners for this publication, false if there was nothing
*/
public final void publish(final Object message) throws Throwable {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
Object listener;
@ -107,7 +106,6 @@ public class Subscription {
listener = iterator.next();
// this.c++;
invocation.invoke(listener, handler, handleIndex, message);
}
}

View File

@ -1,28 +1,25 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
* message handlers.
*
*
* <p>
* <p>
* Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription),
* getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really
* complicates this, so it has been modified for subscribe/unsubscibe to be mutually exclusive.
*
* <p>
* Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread
* manage sub/unsub.
*
@ -58,24 +55,25 @@ public class SubscriptionManager {
private final VarArgUtils varArgUtils;
// private final StampedLock lock = new StampedLock();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final StampedLock lock = new StampedLock();
// private final ReadWriteLock lock = new ReentrantReadWriteLock();
SubscriptionManager(int numberOfThreads) {
public SubscriptionManager(int numberOfThreads) {
float loadFactor = SubscriptionManager.LOAD_FACTOR;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, numberOfThreads);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(64, LOAD_FACTOR, 1);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>();
}
this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads);
this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor,
numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
@ -118,8 +116,11 @@ public class SubscriptionManager {
if (subscriptions == null) {
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
Lock writeLock = this.lock.writeLock();
writeLock.lock();
StampedLock lock = this.lock;
long stamp = lock.writeLock();
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
ConcurrentMap<Class<?>, Subscription[]> subsPerListener2 = this.subscriptionsPerListener;
subscriptions = subsPerListener2.get(listenerClass);
@ -132,7 +133,8 @@ public class SubscriptionManager {
// remember the class as non listening class if no handlers are found
if (handlersSize == 0) {
this.nonListeners.put(listenerClass, Boolean.TRUE);
writeLock.unlock();
lock.unlockWrite(stamp);
// writeLock.unlock();
return;
}
@ -147,7 +149,7 @@ public class SubscriptionManager {
// create the subscription
MessageHandler messageHandler;
for (int i=0;i<handlersSize;i++) {
for (int i = 0; i < handlersSize; i++) {
messageHandler = messageHandlers[i];
// create the subscription
@ -157,24 +159,28 @@ public class SubscriptionManager {
subsPerListener.add(subscription); // activates this sub for sub/unsub
// now add this subscription to each of the handled types
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle,
subsPerMessageMulti, varArgPossibility);
subsForPublication.add(subscription); // activates this sub for publication
}
subsPerListener2.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()]));
writeLock.unlock();
lock.unlockWrite(stamp);
// writeLock.unlock();
return;
} else {
writeLock.unlock();
}
else {
lock.unlockWrite(stamp);
// writeLock.unlock();
}
}
// subscriptions already exist and must only be updated
// only get here if our single-check was OK, or our double-check was OK
// only getSubscriptions here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i=0;i<subscriptions.length;i++) {
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.subscribe(listener);
}
@ -183,9 +189,9 @@ public class SubscriptionManager {
// inside a write lock
// also puts it into the correct map if it's not already there
private Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final VarArgPossibility varArgPossibility) {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final VarArgPossibility varArgPossibility) {
final int size = messageHandlerTypes.length;
@ -200,15 +206,13 @@ public class SubscriptionManager {
if (subs == null) {
subs = new ArrayList<Subscription>();
boolean isArray = utils.isArray(type0);
boolean isArray = type0.isArray();
if (isArray) {
varArgPossibility.set(true);
}
// cache the super classes
// todo: makes it's own read/write lock. it's 2x as expensive when running inside the writelock for subscribe, VS on it's own
// maybe even use StampedLock
utils.cacheSuperClasses(type0, isArray);
// utils.cacheSuperClasses(type0, isArray);
subsPerMessageSingle.put(type0, subs);
}
@ -218,7 +222,7 @@ public class SubscriptionManager {
case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
// subsPerType = subHolderSingle.getSubscriptions();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
// if (putIfAbsent != null) {
@ -236,7 +240,7 @@ public class SubscriptionManager {
case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
// subsPerType = subHolderSingle.getSubscriptions();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
// if (putIfAbsent != null) {
@ -255,7 +259,7 @@ public class SubscriptionManager {
default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
// subsPerType = subHolderSingle.getSubscriptions();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
// if (putIfAbsent != null) {
@ -297,7 +301,7 @@ public class SubscriptionManager {
if (subscriptions != null) {
Subscription subscription;
for (int i=0;i<subscriptions.length;i++) {
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.unsubscribe(listener);
}
@ -309,13 +313,18 @@ public class SubscriptionManager {
this.varArgUtils.clear();
}
private final Subscription[] getListenerSubs(Class<?> listenerClass) {
private Subscription[] getListenerSubs(Class<?> listenerClass) {
Subscription[] subscriptions;
Lock readLock = this.lock.readLock();
readLock.lock();
StampedLock lock = this.lock;
long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
subscriptions = this.subscriptionsPerListener.get(listenerClass);
readLock.unlock();
lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions;
}
@ -326,46 +335,59 @@ public class SubscriptionManager {
ArrayList<Subscription> collection;
Subscription[] subscriptions;
Lock readLock = this.lock.readLock();
readLock.lock();
StampedLock lock = this.lock;
long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass);
//
if (collection != null) {
subscriptions = collection.toArray(new Subscription[collection.size()]);
} else {
subscriptions = EMPTY;
}
else {
// subscriptions = EMPTY;
subscriptions = null;
}
readLock.unlock();
lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions;
}
// never return null
public final Subscription[] getSubscriptions(final Class<?> messageClass) {
ArrayList<Subscription> collection;
Subscription[] subscriptions = null;
Lock readLock = this.lock.readLock();
readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass);
StampedLock lock = this.lock;
long stamp = lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now getSubscriptions superClasses
ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
// now get superClasses
ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null
collection.addAll(superSubscriptions);
} else {
// now get superClasses
collection = this.utils.getSuperSubscriptions(messageClass); // NOT return null
}
else {
collection = superSubscriptions;
}
subscriptions = collection.toArray(new Subscription[collection.size()]);
readLock.unlock();
final Subscription[] subscriptions;
if (collection != null) {
subscriptions = collection.toArray(new Subscription[collection.size()]);
}
else {
subscriptions = null;
}
lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions;
}
@ -381,7 +403,7 @@ public class SubscriptionManager {
// readLock.lock();
//
// try {
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType);
// if (collection != null) {
// subscriptions = collection.toArray(EMPTY);
// }
@ -394,7 +416,7 @@ public class SubscriptionManager {
//
//// long stamp = this.lock.tryOptimisticRead(); // non blocking
////
//// collection = this.subscriptionsPerMessageSingle.get(messageType);
//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType);
//// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection);
@ -407,7 +429,7 @@ public class SubscriptionManager {
//// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
//// stamp = this.lock.readLock();
//// try {
//// collection = this.subscriptionsPerMessageSingle.get(messageType);
//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType);
//// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection);
@ -448,7 +470,8 @@ public class SubscriptionManager {
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2,
Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
@ -481,7 +504,8 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
public Collection<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
return this.varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
}

View File

@ -1,33 +1,24 @@
package dorkbox.util.messagebus.common;
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.SuperClassUtils;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.StampedLock;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionUtils {
private static final Class<?>[] SUPER_CLASS_EMPTY = new Class<?>[0];
private final SuperClassUtils superClass;
private StampedLock superClassLock = new StampedLock();
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final Map<Class<?>, Class<?>[]> superClassesCache;
private final ClassHolder classHolderSingle;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
// it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one
private final Map<Class<?>, ArrayList<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptionsMulti;
@ -40,16 +31,15 @@ public class SubscriptionUtils {
public SubscriptionUtils(Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti,
float loadFactor, int stripeSize) {
HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, float loadFactor,
int stripeSize) {
this.superClass = new SuperClassUtils(loadFactor, 1);
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 8);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
@ -65,66 +55,15 @@ public class SubscriptionUtils {
this.superClassSubscriptions.clear();
}
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*
* if parameter clazz is of type array, then the super classes are of array type as well
*
* protected by read lock by caller. The cache version is called first, by write lock
*/
public final Class<?>[] getSuperClasses_NL(final Class<?> clazz, final boolean isArray) {
// this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
if (classes == null) {
// get all super types of class
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
int length = superTypes.length;
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(length);
Class<?> c;
if (isArray) {
for (int i=0;i<length;i++) {
c = superTypes[i];
c = getArrayClass(c);
if (c != clazz) {
newList.add(c);
}
}
} else {
for (int i=0;i<length;i++) {
c = superTypes[i];
if (c != clazz) {
newList.add(c);
}
}
}
classes = newList.toArray(new Class<?>[newList.size()]);
local.put(clazz, classes);
}
return classes;
}
// called inside sub/unsub write lock
public final void cacheSuperClasses(final Class<?> clazz) {
getSuperClasses_NL(clazz, isArray(clazz));
this.superClass.getSuperClasses(clazz, clazz.isArray());
}
// called inside sub/unsub write lock
public final void cacheSuperClasses(final Class<?> clazz, final boolean isArray) {
getSuperClasses_NL(clazz, isArray);
this.superClass.getSuperClasses(clazz, isArray);
}
// public final Class<?>[] getSuperClasses(Class<?> clazz, boolean isArray) {
@ -136,7 +75,7 @@ public class SubscriptionUtils {
// long stamp = lock.tryOptimisticRead();
//
// if (stamp > 0) {
// ArrayList<Class<?>> arrayList = local.get(clazz);
// ArrayList<Class<?>> arrayList = local.getSubscriptions(clazz);
// if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY);
//
@ -145,7 +84,7 @@ public class SubscriptionUtils {
// } else {
// stamp = lock.readLock();
//
// arrayList = local.get(clazz);
// arrayList = local.getSubscriptions(clazz);
// if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY);
// lock.unlockRead(stamp);
@ -155,7 +94,7 @@ public class SubscriptionUtils {
// }
// }
//
// // unable to get a valid subscription. Have to acquire a write lock
// // unable to getSubscriptions a valid subscription. Have to acquire a write lock
// long origStamp = stamp;
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// lock.unlockRead(origStamp);
@ -163,7 +102,7 @@ public class SubscriptionUtils {
// }
//
//
// // get all super types of class
// // getSubscriptions all super types of class
// Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
// ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size());
// Iterator<Class<?>> iterator;
@ -189,89 +128,51 @@ public class SubscriptionUtils {
// return classes;
// }
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public final Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> arrayVersionCache = this.arrayVersionCache;
Class<?> clazz = arrayVersionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
arrayVersionCache.put(c, clazz);
}
return clazz;
}
/**
* Cache the values of JNI method, isArray(c)
* @return true if the class c is an array type
*/
// @SuppressWarnings("boxing")
public final boolean isArray(final Class<?> c) {
// final Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
//
// final Boolean isArray = isArrayCache.get(c);
// if (isArray == null) {
boolean b = c.isArray();
// isArrayCache.put(c, b);
return b;
// }
// return isArray;
}
public void shutdown() {
this.isArrayCache.clear();
this.arrayVersionCache.clear();
this.superClassesCache.clear();
this.superClass.shutdown();
}
private static Subscription[] EMPTY = new Subscription[0];
private static Class<?>[] EMPTY2 = new Class<?>[0];
private StampedLock superSubLock = new StampedLock();
/**
* Returns an array COPY of the super subscriptions for the specified type.
*
* <p>
* This ALSO checks to see if the superClass accepts subtypes.
*
* <p>
* protected by read lock by caller
*
* @return CAN NOT RETURN NULL
*/
public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> superType) {
public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
ArrayList<Subscription> superSubscriptions = local.get(superType);
ArrayList<Subscription> superSubscriptions = local.get(clazz);
if (superSubscriptions == null) {
final Class<?>[] superClasses = getSuperClasses_NL(superType, isArray(superType)); // never returns null, cached response
final int length = superClasses.length;
// types was not empty, so get subscriptions for each type and collate them
// types was not empty, so getSubscriptions subscriptions for each type and collate them
final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Class<?> superClass;
ArrayList<Subscription> subs;
// save the subscriptions
final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz, clazz.isArray()); // never returns null, cached response
Class<?> superClass;
ArrayList<Subscription> superSubs;
Subscription sub;
final int length = superClasses.length;
int superSubLengh;
superSubscriptions = new ArrayList<Subscription>(length);
for (int i=0;i<length;i++) {
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
subs = local2.get(superClass);
superSubs = local2.get(superClass);
if (subs != null) {
for (int j=0;j<subs.size();j++) {
sub = subs.get(j);
if (superSubs != null) {
superSubLengh = superSubs.size();
for (int j = 0; j < superSubLengh; j++) {
sub = superSubs.get(j);
if (sub.acceptsSubtypes()) {
superSubscriptions.add(sub);
@ -280,7 +181,7 @@ public class SubscriptionUtils {
}
}
local.put(superType, superSubscriptions);
local.put(clazz, superSubscriptions);
}
return superSubscriptions;
@ -301,7 +202,7 @@ public class SubscriptionUtils {
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
// subsPerType = subHolderSingle.getSubscriptions();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
@ -387,7 +288,7 @@ public class SubscriptionUtils {
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
// subsPerType = subHolderSingle.getSubscriptions();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);

View File

@ -1,30 +1,12 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.listeners.*;
import dorkbox.util.messagebus.messages.*;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import org.junit.Test;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.SubscriptionValidator;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.listeners.AbstractMessageListener;
import dorkbox.util.messagebus.listeners.ICountableListener;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.IMultipartMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.MultipartMessageListener;
import dorkbox.util.messagebus.listeners.Overloading;
import dorkbox.util.messagebus.listeners.StandardMessageListener;
import dorkbox.util.messagebus.messages.AbstractMessage;
import dorkbox.util.messagebus.messages.ICountable;
import dorkbox.util.messagebus.messages.IMessage;
import dorkbox.util.messagebus.messages.IMultipartMessage;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
*
* Test the subscriptions as generated and organized by the subscription manager. Tests use different sets of listeners
* and corresponding expected set of subscriptions that should result from subscribing the listeners. The subscriptions
* are tested for the type of messages they should handle and
@ -36,149 +18,124 @@ public class SubscriptionManagerTest extends AssertSupport {
private static final int InstancesPerListener = 5000;
@Test
public void testIMessageListener(){
ListenerFactory listeners = listeners(
IMessageListener.DefaultListener.class,
IMessageListener.DisabledListener.class,
IMessageListener.NoSubtypesListener.class);
@Test public void testIMessageListener() {
ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class, IMessageListener.DisabledListener.class,
IMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMessageListener.DefaultListener.class).handles(IMessage.class,
AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(IMessageListener.DefaultListener.class)
.handles(IMessage.class, AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testAbstractMessageListener(){
ListenerFactory listeners = listeners(
AbstractMessageListener.DefaultListener.class,
AbstractMessageListener.DisabledListener.class,
AbstractMessageListener.NoSubtypesListener.class);
@Test public void testAbstractMessageListener() {
ListenerFactory listeners = listeners(AbstractMessageListener.DefaultListener.class, AbstractMessageListener.DisabledListener.class,
AbstractMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class)
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class);
.listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class)
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMessagesListener(){
ListenerFactory listeners = listeners(
MessagesListener.DefaultListener.class,
MessagesListener.DisabledListener.class,
MessagesListener.NoSubtypesListener.class);
@Test public void testMessagesListener() {
ListenerFactory listeners = listeners(MessagesListener.DefaultListener.class, MessagesListener.DisabledListener.class,
MessagesListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
.listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMultipartMessageListener(){
ListenerFactory listeners = listeners(
MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.DisabledListener.class,
MultipartMessageListener.NoSubtypesListener.class);
@Test public void testMultipartMessageListener() {
ListenerFactory listeners = listeners(MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.DisabledListener.class,
MultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class);
.listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testIMultipartMessageListener(){
ListenerFactory listeners = listeners(
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class,
IMultipartMessageListener.NoSubtypesListener.class);
@Test public void testIMultipartMessageListener() {
ListenerFactory listeners = listeners(IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class,
IMultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
.listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testStandardMessageListener(){
ListenerFactory listeners = listeners(
StandardMessageListener.DefaultListener.class,
StandardMessageListener.DisabledListener.class,
StandardMessageListener.NoSubtypesListener.class);
@Test public void testStandardMessageListener() {
ListenerFactory listeners = listeners(StandardMessageListener.DefaultListener.class, StandardMessageListener.DisabledListener.class,
StandardMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class);
.listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testICountableListener(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.DisabledListener.class,
ICountableListener.NoSubtypesListener.class);
@Test public void testICountableListener() {
ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class,
ICountableListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class).handles(ICountable.class)
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
.listener(ICountableListener.DefaultListener.class).handles(ICountable.class)
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMultipleMessageListeners(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.DisabledListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class,
MessagesListener.DefaultListener.class,
MessagesListener.DisabledListener.class);
@Test public void testMultipleMessageListeners() {
ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class, MessagesListener.DefaultListener.class,
MessagesListener.DisabledListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testOverloadedMessageHandlers(){
ListenerFactory listeners = listeners(
Overloading.ListenerBase.class,
Overloading.ListenerSub.class);
@Test public void testOverloadedMessageHandlers() {
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(Overloading.ListenerBase.class).handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class)
.listener(Overloading.ListenerSub.class).handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class)
.handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class).listener(Overloading.ListenerSub.class)
.handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class);
runTestWith(listeners, expectedSubscriptions);
}
private ListenerFactory listeners(Class<?> ...listeners){
private ListenerFactory listeners(Class<?>... listeners) {
ListenerFactory factory = new ListenerFactory();
for (Class<?> listener : listeners){
for (Class<?> listener : listeners) {
factory.create(InstancesPerListener, listener);
}
return factory;
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);

View File

@ -1,12 +1,11 @@
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.MessageBusTest;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Todo: Add javadoc
@ -49,7 +48,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Synchronized
public void handleMessage(Object o){
counter.getAndIncrement();
// System.err.println(counter.get());
// System.err.println(counter.getSubscriptions());
}
}
}

View File

@ -1,22 +1,15 @@
package dorkbox.util.messagebus.common;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.*;
/**
*
* @author bennidi
* Date: 5/25/13
*/
public class SubscriptionValidator extends AssertSupport{
* @author bennidi
* Date: 5/25/13
*/
public class SubscriptionValidator extends AssertSupport {
private List<ValidationEntry> validations = new LinkedList<ValidationEntry>();
@ -27,11 +20,11 @@ public class SubscriptionValidator extends AssertSupport{
this.subscribedListener = subscribedListener;
}
public Expectation listener(Class subscriber){
public Expectation listener(Class subscriber) {
return new Expectation(subscriber);
}
private SubscriptionValidator expect(Class subscriber, Class messageType){
private SubscriptionValidator expect(Class subscriber, Class messageType) {
this.validations.add(new ValidationEntry(messageType, subscriber));
this.messageTypes.add(messageType);
return this;
@ -39,8 +32,8 @@ public class SubscriptionValidator extends AssertSupport{
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager){
for (Class messageType : this.messageTypes){
public void validate(SubscriptionManager manager) {
for (Class messageType : this.messageTypes) {
Collection<ValidationEntry> validationEntries = getEntries(messageType);
// we split subs + superSubs into TWO calls.
@ -53,11 +46,11 @@ public class SubscriptionValidator extends AssertSupport{
assertEquals(validationEntries.size(), collection.size());
for(ValidationEntry validationValidationEntry : validationEntries){
for (ValidationEntry validationValidationEntry : validationEntries) {
Subscription matchingSub = null;
// one of the subscriptions must belong to the subscriber type
for(Subscription sub : collection){
if(belongsTo(sub, validationValidationEntry.subscriber)){
for (Subscription sub : collection) {
if (belongsTo(sub, validationValidationEntry.subscriber)) {
matchingSub = sub;
break;
}
@ -92,11 +85,11 @@ public class SubscriptionValidator extends AssertSupport{
// }
private Collection<ValidationEntry> getEntries(Class<?> messageType){
private Collection<ValidationEntry> getEntries(Class<?> messageType) {
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
for (ValidationEntry validationValidationEntry : this.validations){
for (ValidationEntry validationValidationEntry : this.validations) {
if(validationValidationEntry.messageType.equals(messageType)) {
if (validationValidationEntry.messageType.equals(messageType)) {
matching.add(validationValidationEntry);
}
}
@ -105,8 +98,7 @@ public class SubscriptionValidator extends AssertSupport{
public class Expectation{
public class Expectation {
private Class listener;
@ -114,14 +106,15 @@ public class SubscriptionValidator extends AssertSupport{
this.listener = listener;
}
public SubscriptionValidator handles(Class ...messages){
for(Class message : messages) {
public SubscriptionValidator handles(Class... messages) {
for (Class message : messages) {
expect(this.listener, message);
}
return SubscriptionValidator.this;
}
}
private class ValidationEntry {

View File

@ -1,11 +1,11 @@
package dorkbox.util.messagebus.common;
import java.util.Iterator;
import java.util.List;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.PubSubSupport;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.Iterator;
import java.util.List;
/**
* Todo: Add javadoc
@ -16,52 +16,48 @@ import dorkbox.util.messagebus.SubscriptionManager;
public class TestUtil {
public static Runnable subscriber(final SubscriptionManager manager, final ListenerFactory listeners){
public static Runnable subscriber(final SubscriptionManager manager, final ListenerFactory listeners) {
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
@Override public void run() {
Object next;
while((next = source.next()) != null){
while ((next = source.next()) != null) {
manager.subscribe(next);
}
}
};
}
public static Runnable unsubscriber(final SubscriptionManager manager, final ListenerFactory listeners){
public static Runnable unsubscriber(final SubscriptionManager manager, final ListenerFactory listeners) {
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
@Override public void run() {
Object next;
while((next = source.next()) != null){
while ((next = source.next()) != null) {
manager.unsubscribe(next);
}
}
};
}
public static Runnable subscriber(final PubSubSupport bus, final ListenerFactory listeners){
public static Runnable subscriber(final PubSubSupport bus, final ListenerFactory listeners) {
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
@Override public void run() {
Object next;
while((next = source.next()) != null){
while ((next = source.next()) != null) {
bus.subscribe(next);
}
}
};
}
public static Runnable unsubscriber(final PubSubSupport bus, final ListenerFactory listeners){
public static Runnable unsubscriber(final PubSubSupport bus, final ListenerFactory listeners) {
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
@Override public void run() {
Object next;
while((next = source.next()) != null){
while ((next = source.next()) != null) {
bus.unsubscribe(next);
}
}
@ -71,28 +67,25 @@ public class TestUtil {
public static void setup(final PubSubSupport bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;
if(listeners.size() >= numberOfThreads){
partitionSize = (int)Math.floor(listeners.size() / numberOfThreads);
if (listeners.size() >= numberOfThreads) {
partitionSize = (int) Math.floor(listeners.size() / numberOfThreads);
}
else{
else {
partitionSize = 1;
numberOfThreads = listeners.size();
}
for(int i = 0; i < numberOfThreads; i++){
for (int i = 0; i < numberOfThreads; i++) {
final int partitionStart = i * partitionSize;
final int partitionEnd = i+1 < numberOfThreads
? partitionStart + partitionSize + 1
: listeners.size();
final int partitionEnd = i + 1 < numberOfThreads ? partitionStart + partitionSize + 1 : listeners.size();
setupUnits[i] = new Runnable() {
private List<Object> listenerSubset = listeners.subList(partitionStart, partitionEnd);
@Override
public void run() {
for(Object listener : this.listenerSubset){
bus.subscribe(listener);
}
@Override public void run() {
for (Object listener : this.listenerSubset) {
bus.subscribe(listener);
}
}
};