stamped lock, switched to ConcurrentSet+CLQ

This commit is contained in:
nathan 2015-05-20 01:08:12 +02:00
parent a4924e58be
commit 9a787506ee
15 changed files with 2204 additions and 593 deletions

View File

@ -1,19 +1,17 @@
package dorkbox.util.messagebus;
import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import org.jctools.util.Pow2;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.common.thread.BooleanHolder;
import dorkbox.util.messagebus.common.thread.BooleanThreadHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
@ -157,21 +155,6 @@ public class MultiMBassador implements IMessageBus {
}
}
@Override
public void subscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.subscribe(listener);
}
@Override
public void unsubscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.unsubscribe(listener);
}
@Override
public final boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public void start() {
for (Thread t : this.threads) {
@ -194,6 +177,21 @@ public class MultiMBassador implements IMessageBus {
this.subscriptionManager.shutdown();
}
@Override
public void subscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.subscribe(listener);
}
@Override
public void unsubscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.unsubscribe(listener);
}
@Override
public final boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
private final BooleanThreadHolder booleanThreadLocal = new BooleanThreadHolder();
@Override
@ -201,70 +199,80 @@ public class MultiMBassador implements IMessageBus {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
ConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null
BooleanHolder subsPublished = this.booleanThreadLocal.get();
subsPublished.bool = false;
Iterator<Subscription> iterator;
Subscription sub;
// Run subscriptions
if (subscriptions != null) {
for (Subscription sub : subscriptions) {
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
// this catches all exception types
sub.publishToSubscription(this, subsPublished, message);
}
}
if (!this.forceExactMatches) {
ConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses
if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) {
for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
// this catches all exception types
sub.publishToSubscription(this, subsPublished, message);
}
}
// publish to var arg, only if not already an array
if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
Object[] asArray = null;
ConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
for (Subscription sub : varargSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, subsPublished, asArray);
}
}
ConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
}
for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, subsPublished, asArray);
}
}
}
// // publish to var arg, only if not already an array
// if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
// Object[] asArray = null;
//
// ConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
// if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
// asArray[0] = message;
//
// for (iterator = varargSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, asArray);
// }
// }
//
// ConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// // now get array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
// asArray[0] = message;
// }
// for (iterator = varargSuperSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, asArray);
// }
// }
// }
}
if (!subsPublished.bool) {
// Dead Event must EXACTLY MATCH (no subclasses)
ConcurrentSet<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub2 : deadSubscriptions) {
for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
// this catches all exception types
sub2.publishToSubscription(this, subsPublished, deadMessage);
sub.publishToSubscription(this, subsPublished, deadMessage);
}
}
}

View File

@ -1,13 +1,12 @@
package dorkbox.util.messagebus;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.SubscriptionUtils;
import dorkbox.util.messagebus.common.VarArgPossibility;
import dorkbox.util.messagebus.common.VarArgUtils;
@ -51,8 +50,8 @@ public class SubscriptionManager {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerMessageMulti;
private final ConcurrentMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
@ -61,12 +60,6 @@ public class SubscriptionManager {
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerListener;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptionsMulti;
private final VarArgUtils varArgUtils;
// stripe size of maps for concurrency
@ -75,36 +68,31 @@ public class SubscriptionManager {
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads;
this.utils = new SubscriptionUtils(LOAD_FACTOR, numberOfThreads);
float loadFactor = SubscriptionManager.LOAD_FACTOR;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, this.STRIPE_SIZE);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(32, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(4, loadFactor, this.STRIPE_SIZE);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, loadFactor);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(32, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor, this.STRIPE_SIZE);
}
// modified by N threads
{
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(32, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
}
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE);
this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.subHolderSingle = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE);
this.subHolderConcurrent = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE);
}
public void shutdown() {
@ -146,7 +134,7 @@ public class SubscriptionManager {
ConcurrentSet<Subscription> subsPerListener = subsPerListener2.get(listenerClass);
if (subsPerListener == null) {
// a listener is subscribed for the first time
StrongConcurrentSet<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass, LOAD_FACTOR, this.STRIPE_SIZE).getHandlers();
int handlersSize = messageHandlers.size();
if (handlersSize == 0) {
@ -157,15 +145,15 @@ public class SubscriptionManager {
VarArgPossibility varArgPossibility = this.varArgPossibility;
subsPerListener = new ConcurrentSet<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
ISetEntry<MessageHandler> current = messageHandlers.head;
Iterator<MessageHandler> iterator;
MessageHandler messageHandler;
while (current != null) {
messageHandler = current.getValue();
current = current.next();
Collection<Subscription> subsPerType = null;
for (iterator = messageHandlers.iterator(); iterator.hasNext();) {
messageHandler = iterator.next();
ConcurrentSet<Subscription> subsPerType = null;
// now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages();
@ -176,12 +164,13 @@ public class SubscriptionManager {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get();
ConcurrentSet<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType);
Collection<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
boolean isArray = this.utils.isArray(types[0]);
// cache the super classes
this.utils.getSuperClasses(types[0], isArray);
if (isArray) {
varArgPossibility.set(true);
@ -194,11 +183,12 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
ConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]);
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
}
@ -209,11 +199,12 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
ConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]);
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
this.utils.getSuperClasses(types[2]);
@ -225,7 +216,7 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
ConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types);
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
@ -253,8 +244,11 @@ public class SubscriptionManager {
subsPerListener2.put(listenerClass, subsPerListener);
}
} else {
// subscriptions already exist and must only be updated
for (Subscription sub : subsPerListener) {
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = subsPerListener.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.subscribe(listener);
}
}
@ -276,9 +270,13 @@ public class SubscriptionManager {
clearConcurrentCollections();
synchronized(listenerClass) {
ConcurrentSet<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription sub : subscriptions) {
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.unsubscribe(listener);
}
}
@ -286,23 +284,23 @@ public class SubscriptionManager {
}
private void clearConcurrentCollections() {
this.superClassSubscriptions.clear();
this.utils.clear();
this.varArgUtils.clear();
}
// CAN RETURN NULL
public final ConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
}
// CAN RETURN NULL
public final ConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.get(messageType1, messageType2);
}
// CAN RETURN NULL
public final ConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
@ -321,263 +319,40 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) {
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) {
return this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
return this.varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2);
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
public Collection<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return this.varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public final ConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType) {
// whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptions;
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
StrongConcurrentSet<Class<?>> types = this.utils.getSuperClasses(superType);
if (types.isEmpty()) {
return subsPerType;
}
Map<Class<?>, ConcurrentSet<Subscription>> local2 = this.subscriptionsPerMessageSingle;
ISetEntry<Class<?>> current1 = null;
Class<?> superClass;
current1 = types.head;
while (current1 != null) {
superClass = current1.getValue();
current1 = current1.next();
ConcurrentSet<Subscription> subs = local2.get(superClass);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
return subsPerType;
} else {
// someone beat us
return putIfAbsent;
}
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
return this.utils.getSuperSubscriptions(superType);
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public ConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
ConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
// whenever our subscriptions change, this map is cleared.
StrongConcurrentSet<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSet<Class<?>> types2 = this.utils.getSuperClasses(superType2);
ConcurrentSet<Subscription> subs;
HashMapTree<Class<?>, ConcurrentSet<Subscription>> leaf1;
HashMapTree<Class<?>, ConcurrentSet<Subscription>> leaf2;
ISetEntry<Class<?>> current1 = null;
Class<?> eventSuperType1;
ISetEntry<Class<?>> current2 = null;
Class<?> eventSuperType2;
if (types1 != null) {
current1 = types1.head;
}
while (current1 != null) {
eventSuperType1 = current1.getValue();
current1 = current1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
if (types2 != null) {
current2 = types2.head;
}
while (current2 != null) {
eventSuperType2 = current2.getValue();
current2 = current2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
return this.utils.getSuperSubscriptions(superType1, superType2);
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public ConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
ConcurrentSet<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
StrongConcurrentSet<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSet<Class<?>> types2 = this.utils.getSuperClasses(superType2);
StrongConcurrentSet<Class<?>> types3 = this.utils.getSuperClasses(superType3);
ConcurrentSet<Subscription> subs;
HashMapTree<Class<?>, ConcurrentSet<Subscription>> leaf1;
HashMapTree<Class<?>, ConcurrentSet<Subscription>> leaf2;
HashMapTree<Class<?>, ConcurrentSet<Subscription>> leaf3;
ISetEntry<Class<?>> current1 = null;
Class<?> eventSuperType1;
ISetEntry<Class<?>> current2 = null;
Class<?> eventSuperType2;
ISetEntry<Class<?>> current3 = null;
Class<?> eventSuperType3;
if (types1 != null) {
current1 = types1.head;
}
while (current1 != null) {
eventSuperType1 = current1.getValue();
current1 = current1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
if (types2 != null) {
current2 = types2.head;
}
while (current2 != null) {
eventSuperType2 = current2.getValue();
current2 = current2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2;
if (type12Matches) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
if (types3 != null) {
current3 = types3.head;
}
while (current3 != null) {
eventSuperType3 = current3.getValue();
current3 = current3.next();
if (type12Matches && eventSuperType3 == superType3) {
continue;
}
leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) {
subs = leaf3.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
return this.utils.getSuperSubscriptions(superType1, superType2, superType3);
}
}

View File

@ -4,9 +4,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
import dorkbox.util.messagebus.common.thread.StampedLock;
abstract class pad<T> extends item<T> {
@ -27,7 +26,7 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
private final transient long ID = id.getAndIncrement();
// Internal state
protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
protected final StampedLock lock = new StampedLock();
private final transient Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
volatile long y0, y1, y2, y4, y5, y6 = 7L;
@ -47,25 +46,38 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
}
boolean changed = false;
Lock writeLock = this.lock.writeLock();
writeLock.lock();
long stamp = this.lock.readLock();
if (this.entries.containsKey(element)) {
this.lock.unlockRead(stamp);
return false;
}
long newStamp = 0L;
while ((newStamp = this.lock.tryConvertToWriteLock(stamp)) == 0) {
this.lock.unlockRead(stamp);
stamp = this.lock.writeLock();
}
stamp = newStamp;
changed = insert(element);
writeLock.unlock();
this.lock.unlock(stamp);
return changed;
}
@Override
public boolean contains(Object element) {
Lock readLock = this.lock.readLock();
ISetEntry<T> entry;
try {
readLock.lock();
entry = this.entries.get(element);
long stamp = this.lock.tryOptimisticRead();
} finally {
readLock.unlock();
ISetEntry<T> entry = this.entries.get(element);
if (!this.lock.validate(stamp)) {
stamp = this.lock.readLock();
entry = this.entries.get(element);
this.lock.unlockRead(stamp);
}
return entry != null && entry.getValue() != null;
}
@ -90,18 +102,21 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
@Override
public boolean addAll(Collection<? extends T> elements) {
StampedLock lock = this.lock;
boolean changed = false;
Lock writeLock = this.lock.writeLock();
long stamp = lock.writeLock();
try {
writeLock.lock();
for (T element : elements) {
if (element != null) {
changed |= insert(element);
}
}
} finally {
writeLock.unlock();
lock.unlockWrite(stamp);
}
return changed;
}
@ -110,33 +125,35 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
*/
@Override
public boolean remove(Object element) {
StampedLock lock = this.lock;
long stamp = lock.tryOptimisticRead();
Lock updateLock = this.lock.updateLock();
try {
updateLock.lock();
ISetEntry<T> entry = this.entries.get(element);
ISetEntry<T> entry = this.entries.get(element);
if (entry != null && entry.getValue() != null) {
Lock writeLock = this.lock.writeLock();
try {
writeLock.lock();
if (entry != this.head) {
entry.remove();
} else {
// if it was second, now it's first
this.head = this.head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
this.entries.remove(element);
return true;
} finally {
writeLock.unlock();
if (!lock.validate(stamp)) {
stamp = lock.readLock();
entry = this.entries.get(element);
lock.unlockRead(stamp);
}
if (entry == null || entry.getValue() == null) {
return false; // fast exit
} else {
stamp = lock.writeLock();
try {
if (entry != this.head) {
entry.remove();
} else {
// if it was second, now it's first
this.head = this.head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
} else {
return false; // fast exit
this.entries.remove(element);
return true;
} finally {
lock.unlockWrite(stamp);
}
} finally {
updateLock.unlock();
}
}
@ -146,7 +163,7 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
}
@Override
public <T> T[] toArray(T[] a) {
public <T2> T2[] toArray(T2[] a) {
return this.entries.entrySet().toArray(a);
}
@ -167,14 +184,12 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
@Override
public void clear() {
Lock writeLock = this.lock.writeLock();
try {
writeLock.lock();
this.head = null;
this.entries.clear();
} finally {
writeLock.unlock();
}
StampedLock lock = this.lock;
long stamp = lock.writeLock();
this.head = null;
this.entries.clear();
lock.unlockWrite(stamp);
}
@Override
@ -196,6 +211,7 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("rawtypes")
AbstractConcurrentSet other = (AbstractConcurrentSet) obj;
if (this.ID != other.ID) {
return false;

View File

@ -4,8 +4,10 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/**
* @author bennidi
@ -16,13 +18,13 @@ import dorkbox.util.messagebus.annotations.Handler;
*/
public class ReflectionUtils {
public static StrongConcurrentSetV8<Method> getMethods(Class<?> target) {
StrongConcurrentSetV8<Method> hashSet = new StrongConcurrentSetV8<Method>(16, .8F);
public static Collection<Method> getMethods(Class<?> target) {
Collection<Method> hashSet = new ConcurrentSet<Method>(16, .8F, 1);
getMethods(target, hashSet);
return hashSet;
}
private static void getMethods(Class<?> target, StrongConcurrentSetV8<Method> methods) {
private static void getMethods(Class<?> target, Collection<Method> methods) {
try {
for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) {
@ -66,8 +68,8 @@ public class ReflectionUtils {
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static StrongConcurrentSetV8<Class<?>> getSuperTypes(Class<?> from) {
StrongConcurrentSetV8<Class<?>> superclasses = new StrongConcurrentSetV8<Class<?>>(8, 0.8F);
public static Collection<Class<?>> getSuperTypes(Class<?> from) {
Collection<Class<?>> superclasses = new ConcurrentSet<Class<?>>(8, 0.8F, 1);
collectInterfaces( from, superclasses );
@ -79,21 +81,19 @@ public class ReflectionUtils {
return superclasses;
}
public static void collectInterfaces( Class<?> from, StrongConcurrentSetV8<Class<?>> accumulator ) {
public static void collectInterfaces( Class<?> from, Collection<Class<?>> accumulator ) {
for ( Class<?> intface : from.getInterfaces() ) {
accumulator.add( intface );
collectInterfaces( intface, accumulator );
}
}
//
public static boolean containsOverridingMethod(final StrongConcurrentSetV8<Method> allMethods, final Method methodToCheck) {
ISetEntry<Method> current = allMethods.head;
public static boolean containsOverridingMethod(final Collection<Method> allMethods, final Method methodToCheck) {
Iterator<Method> iterator;
Method method;
while (current != null) {
method = current.getValue();
current = current.next();
for (iterator = allMethods.iterator(); iterator.hasNext();) {
method = iterator.next();
if (isOverriddenBy(methodToCheck, method)) {
return true;
@ -131,7 +131,7 @@ public class ReflectionUtils {
}
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType) {
A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8<AnnotatedElement>(16, .8F));
A annotation = getAnnotation(from, annotationType, new ConcurrentSet<AnnotatedElement>(16, .8F, 1));
return annotation;
}

View File

@ -1,26 +1,61 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionUtils {
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, StrongConcurrentSet<Class<?>>> superClassesCache;
private final ConcurrentMap<Class<?>, ConcurrentSet<Class<?>>> superClassesCache;
private final ClassHolder classHolderSingle;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptionsMulti;
public SubscriptionUtils(float loadFactor, int stripeSize) {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, loadFactor, stripeSize);
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Class<?>>>(64, loadFactor, stripeSize);
this.classHolderSingle = new ClassHolder(loadFactor);
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
public SubscriptionUtils(Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti,
float loadFactor, int stripeSize) {
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Class<?>>>(32, loadFactor, stripeSize);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(32, loadFactor, stripeSize);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor);
this.subHolderSingle = new SubscriptionHolder(loadFactor, stripeSize);
this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize);
}
public void clear() {
this.superClassSubscriptions.clear();
}
/**
@ -28,31 +63,29 @@ public class SubscriptionUtils {
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/
public StrongConcurrentSet<Class<?>> getSuperClasses(Class<?> clazz) {
public Collection<Class<?>> getSuperClasses(Class<?> clazz) {
return getSuperClasses(clazz, isArray(clazz));
}
public final StrongConcurrentSet<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
public final Collection<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, StrongConcurrentSet<Class<?>>> local = this.superClassesCache;
ConcurrentMap<Class<?>, ConcurrentSet<Class<?>>> local = this.superClassesCache;
ClassHolder classHolderSingle = this.classHolderSingle;
StrongConcurrentSet<Class<?>> classes = classHolderSingle.get();
ConcurrentSet<Class<?>> classes = classHolderSingle.get();
StrongConcurrentSet<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
ConcurrentSet<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
if (putIfAbsent == null) {
// we are the first one in the map
classHolderSingle.set(classHolderSingle.initialValue());
// it doesn't matter if concurrent access stomps on values, since they are always the same.
StrongConcurrentSet<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ISetEntry<Class<?>> current = superTypes.head;
Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
Iterator<Class<?>> iterator;
Class<?> c;
while (current != null) {
c = current.getValue();
current = current.next();
for (iterator = superTypes.iterator(); iterator.hasNext();) {
c = iterator.next();
if (isArray) {
c = getArrayClass(c);
}
@ -111,4 +144,239 @@ public class SubscriptionUtils {
this.superClassesCache.clear();
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
// whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptions;
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
Collection<Class<?>> types = getSuperClasses(superType);
if (types.isEmpty()) {
return subsPerType;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Class<?> superClass;
Iterator<Class<?>> iterator;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator = types.iterator(); iterator.hasNext();) {
superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
return subsPerType;
} else {
// someone beat us
return putIfAbsent;
}
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
ConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
// whenever our subscriptions change, this map is cleared.
Collection<Class<?>> types1 = getSuperClasses(superType1);
if (types1 != null) {
Collection<Class<?>> types2 = getSuperClasses(superType2);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Iterator<Class<?>> iterator1;
Iterator<Class<?>> iterator2;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator1 = types1.iterator(); iterator1.hasNext();) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null && types2 != null) {
for (iterator2 = types2.iterator(); iterator2.hasNext();) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
ConcurrentSet<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
Collection<Class<?>> types1 = getSuperClasses(superType1);
if (types1 != null) {
Collection<Class<?>> types2 = getSuperClasses(superType2);
Collection<Class<?>> types3 = getSuperClasses(superType3);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
HashMapTree<Class<?>, Collection<Subscription>> leaf3;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Class<?> eventSuperType3;
Iterator<Class<?>> iterator1;
Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator1 = types1.iterator(); iterator1.hasNext();) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null && types2 != null) {
for (iterator2 = types2.iterator(); iterator2.hasNext();) {
eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2;
if (type12Matches) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null && types3 != null) {
for (iterator3 = types3.iterator(); iterator3.hasNext();) {
eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) {
continue;
}
leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) {
subs = leaf3.getValue();
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
}

View File

@ -1,5 +1,7 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -18,10 +20,10 @@ public class VarArgUtils {
private final int stripeSize;
private final SubscriptionUtils utils;
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerMessageSingle;
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(SubscriptionUtils utils, ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerMessageSingle,
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle,
float loadFactor, int stripeSize) {
this.utils = utils;
@ -63,9 +65,13 @@ public class VarArgUtils {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
ConcurrentSet<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
Iterator<Subscription> iterator;
Subscription sub;
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
if (subs != null) {
for (Subscription sub : subs) {
for (iterator = subs.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
@ -98,24 +104,27 @@ public class VarArgUtils {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
StrongConcurrentSet<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
Collection<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
if (types.isEmpty()) {
return null;
return subsPerType;
}
Map<Class<?>, ConcurrentSet<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
ISetEntry<Class<?>> current1;
Iterator<Class<?>> iterator;
Class<?> superClass;
current1 = types.head;
while (current1 != null) {
superClass = current1.getValue();
current1 = current1.next();
Iterator<Subscription> subIterator;
Subscription sub;
ConcurrentSet<Subscription> subs = local2.get(superClass);
for (iterator = types.iterator(); iterator.hasNext();) {
superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null) {
for (Subscription sub : subs) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
@ -159,7 +168,11 @@ public class VarArgUtils {
ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
for (Subscription sub : varargSuperSubscriptions1) {
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
@ -202,7 +215,11 @@ public class VarArgUtils {
ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
ConcurrentSet<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
for (Subscription sub : varargSuperSubscriptions1) {
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
subsPerType.add(sub);
}

View File

@ -3,7 +3,8 @@ package dorkbox.util.messagebus.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
import java.util.concurrent.locks.Lock;
import dorkbox.util.messagebus.common.thread.StampedLock;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of
@ -34,9 +35,10 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
// until it finds the first entry whose value has not yet been garbage collected
// the method assumes that the current element is already orphaned and will remove it
private void removeOrphans(){
Lock writelock = WeakConcurrentSet.this.lock.writeLock();
StampedLock lock = WeakConcurrentSet.this.lock;
long stamp = lock.writeLock();
try{
writelock.lock();
do {
ISetEntry<T> orphaned = this.current;
this.current = this.current.next();
@ -44,7 +46,7 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
} while(this.current != null && this.current.getValue() == null);
}
finally {
writelock.unlock();
lock.unlockWrite(stamp);
}
}

View File

@ -1,21 +1,22 @@
package dorkbox.util.messagebus.common.thread;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
public class ClassHolder extends ThreadLocal<StrongConcurrentSetV8<Class<?>>> {
public class ClassHolder extends ThreadLocal<ConcurrentSet<Class<?>>> {
private final float loadFactor;
private final int stripeSize;
public ClassHolder(float loadFactor) {
public ClassHolder(float loadFactor, int stripeSize) {
super();
this.loadFactor = loadFactor;
this.stripeSize = stripeSize;
}
@Override
public StrongConcurrentSetV8<Class<?>> initialValue() {
return new StrongConcurrentSetV8<Class<?>>(16, this.loadFactor);
public ConcurrentSet<Class<?>> initialValue() {
return new ConcurrentSet<Class<?>>(16, this.loadFactor, this.stripeSize);
}
}

View File

@ -94,7 +94,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
}
Node<T> pred = null;
for (Node<T> p = node; p != null; p = succ(p)) {
for (Node<T> p = this.head; p != null; p = succ(p)) {
T item = p.item;
if (item != null &&
element.equals(item) &&

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,9 @@
package dorkbox.util.messagebus.listener;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import java.util.Collection;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/**
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message
@ -18,11 +21,11 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
*/
public class MessageListener {
private final StrongConcurrentSetV8<MessageHandler> handlers;
private final Collection<MessageHandler> handlers;
private Class<?> listenerDefinition;
public MessageListener(Class<?> listenerDefinition, int size) {
this.handlers = new StrongConcurrentSetV8<MessageHandler>(size, 0.8F);
public MessageListener(Class<?> listenerDefinition, int size, float loadFactor, int stripeSize) {
this.handlers = new ConcurrentSet<MessageHandler>(size, loadFactor, stripeSize);
this.listenerDefinition = listenerDefinition;
}
@ -35,7 +38,7 @@ public class MessageListener {
return this.handlers.add(messageHandler);
}
public StrongConcurrentSetV8<MessageHandler> getHandlers() {
public Collection<MessageHandler> getHandlers() {
return this.handlers;
}
}

View File

@ -1,11 +1,12 @@
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/**
* The meta data reader is responsible for parsing and validating message handler configurations.
@ -19,35 +20,31 @@ public class MetadataReader {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public MessageListener getMessageListener(Class<?> target) {
public MessageListener getMessageListener(Class<?> target, float loadFactor, int stripeSize) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
StrongConcurrentSetV8<Method> allHandlers = ReflectionUtils.getMethods(target);
Collection<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
StrongConcurrentSetV8<Method> bottomMostHandlers = new StrongConcurrentSetV8<Method>(allHandlers.size(), 0.8F);
ISetEntry<Method> current = allHandlers.head;
Collection<Method> bottomMostHandlers = new ConcurrentSet<Method>(allHandlers.size(), loadFactor, stripeSize);
Iterator<Method> iterator;
Method handler;
while (current != null) {
handler = current.getValue();
current = current.next();
for (iterator = allHandlers.iterator(); iterator.hasNext();) {
handler = iterator.next();
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
bottomMostHandlers.add(handler);
}
}
MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size());
MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size(), loadFactor, stripeSize);
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
current = bottomMostHandlers.head;
while (current != null) {
handler = current.getValue();
current = current.next();
for (iterator = bottomMostHandlers.iterator(); iterator.hasNext();) {
handler = iterator.next();
Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class);
if (handlerConfig == null || !handlerConfig.enabled()) {

View File

@ -1,13 +1,13 @@
package dorkbox.util.messagebus.subscription;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.thread.BooleanHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
@ -39,11 +39,11 @@ public class Subscription {
private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation;
private final StrongConcurrentSet<Object> listeners;
private final ConcurrentSet<Object> listeners;
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSet<Object>();
this.listeners = new ConcurrentSet<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
@ -93,27 +93,22 @@ public class Subscription {
return this.listeners.size();
}
public int count =0;
public int getCount() {
return this.count;
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message) {
StrongConcurrentSet<Object> listeners = this.listeners;
ConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
ISetEntry<Object> current = listeners.head;
Iterator<Object> iterator;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
for (iterator = listeners.iterator(); iterator.hasNext();) {
listener = iterator.next();
try {
invocation.invoke(listener, handler, handleIndex, message);
@ -160,128 +155,128 @@ public class Subscription {
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) {
StrongConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
ISetEntry<Object> current = listeners.head;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " +
message1.getClass() + ", " +
message2.getClass()
+ ". Expected: " + handler.getParameterTypes()[0] + ", " +
handler.getParameterTypes()[1]
)
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
}
}
booleanHolder.bool = true;
}
// StrongConcurrentSet<Object> listeners = this.listeners;
//
// if (!listeners.isEmpty()) {
// MethodAccess handler = this.handlerMetadata.getHandler();
// int handleIndex = this.handlerMetadata.getMethodIndex();
// IHandlerInvocation invocation = this.invocation;
//
//
// ISetEntry<Object> current = listeners.head;
// Object listener;
// while (current != null) {
// listener = current.getValue();
// current = current.next();
//
// try {
// invocation.invoke(listener, handler, handleIndex, message1, message2);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1]
// )
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// }
// }
// booleanHolder.bool = true;
// }
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) {
StrongConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
ISetEntry<Object> current = listeners.head;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " +
message1.getClass() + ", " +
message2.getClass() + ", " +
message3.getClass()
+ ". Expected: " + handler.getParameterTypes()[0] + ", " +
handler.getParameterTypes()[1] + ", " +
handler.getParameterTypes()[2]
)
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
}
}
booleanHolder.bool = true;
}
// StrongConcurrentSet<Object> listeners = this.listeners;
//
// if (!listeners.isEmpty()) {
// MethodAccess handler = this.handlerMetadata.getHandler();
// int handleIndex = this.handlerMetadata.getMethodIndex();
// IHandlerInvocation invocation = this.invocation;
//
//
// ISetEntry<Object> current = listeners.head;
// Object listener;
// while (current != null) {
// listener = current.getValue();
// current = current.next();
//
// try {
// invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass() + ", " +
// message3.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1] + ", " +
// handler.getParameterTypes()[2]
// )
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// }
// }
// booleanHolder.bool = true;
// }
}

View File

@ -26,7 +26,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithoutInheritance() {
MessageListener listener = this.reader.getMessageListener(MessageListener1.class);
MessageListener listener = this.reader.getMessageListener(MessageListener1.class, 0.85F, 4);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -45,7 +45,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithInheritance() {
MessageListener listener = this.reader.getMessageListener(MessageListener2.class);
MessageListener listener = this.reader.getMessageListener(MessageListener2.class, 0.85F, 4);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -55,7 +55,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithInheritanceOverriding() {
MessageListener listener = this.reader.getMessageListener(MessageListener3.class);
MessageListener listener = this.reader.getMessageListener(MessageListener3.class, 0.85F, 4);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(0, String.class)
@ -198,7 +198,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testMultipleSignatureListenerWithoutInheritance() {
MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class);
MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class, 0.85F, 4);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(7, String.class)
.expectHandlers(9, String.class, String.class)

View File

@ -27,11 +27,12 @@ public class PerfTest_Collections {
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final float LOAD_FACTOR = 0.8F;
private static MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class);
private static final MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class, LOAD_FACTOR, 8);
public static void main(final String[] args) throws Exception {
final int size = 16;
System.out.println("reps:" + REPETITIONS + " size: " + size);
// have to warm-up the JVM.
@ -72,8 +73,10 @@ public class PerfTest_Collections {
final int warmupRuns = 2;
final int runs = 3;
Collection<MessageHandler> handlers = messageListener.getHandlers();
for (int i=0;i<size;i++) {
for (MessageHandler x : messageListener.getHandlers()) {
for (MessageHandler x : handlers) {
set.add(new Subscription(x));
}
}