WIP cleanup, unit tests

This commit is contained in:
nathan 2015-02-27 16:32:05 +01:00
parent dd31c45dc3
commit 45614e8bdd
13 changed files with 204 additions and 301 deletions

View File

@ -174,15 +174,13 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass = message.getClass(); Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean publishDeadSubs = true; boolean subsPublished = false;
// Run subscriptions // Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) { if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) { for (Subscription sub : subscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message); subsPublished |= sub.publishToSubscription(this, message);
} }
} }
@ -190,11 +188,9 @@ public class MultiMBassador implements IMessageBus {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass); Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses // now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) { if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : superSubscriptions) { for (Subscription sub : superSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message); subsPublished |= sub.publishToSubscription(this, message);
} }
} }
@ -205,22 +201,18 @@ public class MultiMBassador implements IMessageBus {
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass); Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
asArray = (Object[]) Array.newInstance(messageClass, 1); asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message; asArray[0] = message;
for (Subscription sub : varargSubscriptions) { for (Subscription sub : varargSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) { if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1); asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message; asArray[0] = message;
@ -228,13 +220,13 @@ public class MultiMBassador implements IMessageBus {
for (Subscription sub : varargSuperSubscriptions) { for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
} }
} }
if (publishDeadSubs) { if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
@ -255,15 +247,14 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass2 = message2.getClass(); Class<?> messageClass2 = message2.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
boolean publishDeadSubs = true; boolean subsPublished = false;
// Run subscriptions // Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) { if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) { for (Subscription sub : subscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message1, message2); subsPublished |= sub.publishToSubscription(this, message1, message2);
} }
} }
@ -271,10 +262,9 @@ public class MultiMBassador implements IMessageBus {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// now get superClasses // now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) { if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : superSubscriptions) { for (Subscription sub : superSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message1, message2); subsPublished |= sub.publishToSubscription(this, message1, message2);
} }
} }
@ -284,23 +274,19 @@ public class MultiMBassador implements IMessageBus {
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1; asArray[0] = message1;
asArray[1] = message2; asArray[1] = message2;
for (Subscription sub : varargSubscriptions) { for (Subscription sub : varargSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) { if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1; asArray[0] = message1;
@ -309,14 +295,14 @@ public class MultiMBassador implements IMessageBus {
for (Subscription sub : varargSuperSubscriptions) { for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
} }
} }
if (publishDeadSubs) { if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
@ -338,14 +324,13 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass3 = message3.getClass(); Class<?> messageClass3 = message3.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
boolean publishDeadSubs = true; boolean subsPublished = false;
// Run subscriptions // Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) { if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) { for (Subscription sub : subscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, message1, message2, message3); subsPublished |= sub.publishToSubscription(this, message1, message2, message3);
} }
} }
@ -365,8 +350,6 @@ public class MultiMBassador implements IMessageBus {
Object[] asArray = null; Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1; asArray[0] = message1;
asArray[1] = message2; asArray[1] = message2;
@ -374,15 +357,13 @@ public class MultiMBassador implements IMessageBus {
for (Subscription sub : varargSubscriptions) { for (Subscription sub : varargSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) { if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1; asArray[0] = message1;
@ -392,14 +373,14 @@ public class MultiMBassador implements IMessageBus {
for (Subscription sub : varargSuperSubscriptions) { for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types // this catches all exception types
sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
} }
} }
} }
if (publishDeadSubs) { if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {

View File

@ -5,13 +5,14 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ReflectionUtils; import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SubscriptionHolder;
import dorkbox.util.messagebus.common.SuperClassIterator; import dorkbox.util.messagebus.common.SuperClassIterator;
import dorkbox.util.messagebus.listener.MessageHandler; import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader; import dorkbox.util.messagebus.listener.MetadataReader;
@ -51,18 +52,14 @@ public class SubscriptionManager {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time // write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle; private final ConcurrentMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti; private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// synchronize read/write access to the subscription maps
private final ReentrantLock lock = new ReentrantLock();
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing // this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently // write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change // once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener; private final ConcurrentMap<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final Map<Class<?>, Class<?>> arrayVersionCache; private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Collection<Class<?>>> superClassesCache; private final Map<Class<?>, Collection<Class<?>>> superClassesCache;
@ -79,6 +76,9 @@ public class SubscriptionManager {
// stripe size of maps for concurrency // stripe size of maps for concurrency
private final int STRIPE_SIZE; private final int STRIPE_SIZE;
private final SubscriptionHolder subHolder;
private final SubscriptionHolder subHolderConcurrent;
SubscriptionManager(int numberOfThreads) { SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads; this.STRIPE_SIZE = numberOfThreads;
@ -109,23 +109,24 @@ public class SubscriptionManager {
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
} }
this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1);
this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
} }
public void shutdown() { public void shutdown() {
this.lock.lock(); this.nonListeners.clear();
try {
this.nonListeners.clear(); this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageSingle.clear(); this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerMessageMulti.clear(); this.subscriptionsPerListener.clear();
this.subscriptionsPerListener.clear();
this.arrayVersionCache.clear(); this.superClassesCache.clear();
this.superClassesCache.clear(); this.superClassSubscriptions.clear();
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear(); this.arrayVersionCache.clear();
this.varArgSuperClassSubscriptions.clear(); this.varArgSubscriptions.clear();
} finally { this.varArgSuperClassSubscriptions.clear();
this.lock.unlock();
}
} }
public void subscribe(Object listener) { public void subscribe(Object listener) {
@ -140,172 +141,106 @@ public class SubscriptionManager {
return; return;
} }
// these are concurrent collections
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Map<Class<?>, Collection<Subscription>> subsPerListener = this.subscriptionsPerListener;
Collection<Subscription> subscriptions = subsPerListener.get(listenerClass); // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
if (subscriptions == null) { // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
// we could lock later, in the loop (where it actually needs to be locked), but doing so slows it down (as per benchmarking) synchronized(listenerClass) {
this.lock.lock(); ConcurrentMap<Class<?>, Collection<Subscription>> subsPerListener2 = this.subscriptionsPerListener;
try { Collection<Subscription> subsPerListener = subsPerListener2.get(listenerClass);
if (subsPerListener == null) {
// a listener is subscribed for the first time // a listener is subscribed for the first time
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size(); int handlersSize = messageHandlers.size();
if (handlersSize == 0) { if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found // remember the class as non listening class if no handlers are found
this.nonListeners.put(listener.getClass(), Boolean.TRUE); this.nonListeners.put(listenerClass, Boolean.TRUE);
return; return;
} else { } else {
subscriptions = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1); // really was null
Map<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle; subsPerListener = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1);
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
// create NEW subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) { for (MessageHandler messageHandler : messageHandlers) {
Collection<Subscription> subsPerType = null;
// now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length;
switch (size) {
case 1: {
Collection<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get());
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subsPerType = this.subHolderConcurrent.get();
this.subHolderConcurrent.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE));
getSuperClass(types[0]);
}
break;
}
case 2: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
getSuperClass(types[0]);
getSuperClass(types[1]);
}
break;
}
case 3: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
getSuperClass(types[0]);
getSuperClass(types[1]);
getSuperClass(types[2]);
}
break;
}
default: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
for (Class<?> c : types) {
getSuperClass(c);
}
}
break;
}
}
// create the subscription // create the subscription
Subscription subscription = new Subscription(messageHandler); Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener); subscription.subscribe(listener);
subscriptions.add(subscription); subsPerListener.add(subscription);
subsPerType.add(subscription);
// now add this subscription to each of the handled types
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
Collection<Subscription> subs = subsPerMessageSingle.get(clazz);
if (subs == null || subs.isEmpty()) {
subs = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1);
subsPerMessageSingle.put(clazz, subs);
}
subs.add(subscription);
getSuperClass(clazz);
} else {
// multiversion
HashMapTree<Class<?>, Collection<Subscription>> tree;
switch (size) {
case 2: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
getSuperClass(handledMessageTypes[0]);
getSuperClass(handledMessageTypes[1]);
break;
}
case 3: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
getSuperClass(handledMessageTypes[0]);
getSuperClass(handledMessageTypes[1]);
getSuperClass(handledMessageTypes[2]);
break;
}
default: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
for (Class<?> c : handledMessageTypes) {
getSuperClass(c);
}
break;
}
}
Collection<Subscription> subs = tree.getValue();
if (subs == null) {
subs = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1);
tree.putValue(subs);
}
subs.add(subscription);
}
} }
// order is critical for safe publication subsPerListener2.put(listenerClass, subsPerListener);
subsPerListener.put(listenerClass, subscriptions); }
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subsPerListener) {
subscription.subscribe(listener);
} }
} finally {
this.lock.unlock();
}
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
} }
} }
//
// if (subscriptions != null) {
// // subscriptions already exist and must only be updated
// for (Subscription subscription : subscriptions) {
// subscription.subscribe(listener);
// }
// }
// else {
// // a listener is subscribed for the first time
// Collection<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
// int handlersSize = messageHandlers.size();
//
// if (handlersSize == 0) {
// // remember the class as non listening class if no handlers are found
// this.nonListeners.put(listenerClass, this.holder);
// } else {
// subscriptions = new StrongConcurrentSet<Subscription>(handlersSize, this.LOAD_FACTOR);
//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, this.MAP_STRIPING));
//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1));
//// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR));
// this.subscriptionsPerListener.put(listenerClass, subscriptions);
//
// resetSuperClassSubs();
//
// // create NEW subscriptions for all detected message handlers
// for (MessageHandler messageHandler : messageHandlers) {
// // create the subscription
// Subscription subscription = new Subscription(messageHandler);
// subscription.subscribe(listener);
//
// subscriptions.add(subscription);
//
// //
// // save the subscription per message type
// //
// // single or multi?
// Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
// int size = handledMessageTypes.length;
// boolean acceptsSubtypes = subscription.acceptsSubtypes();
//
// if (size == 1) {
// // single
// Class<?> clazz = handledMessageTypes[0];
//
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
// if (subs == null) {
// Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
// if (putIfAbsent != null) {
// subs = putIfAbsent;
// } else {
// subs = this.subInitialValue.get();
//// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(8, this.LOAD_FACTOR, 1)));
//// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap<Subscription>(8, this.LOAD_FACTOR)));
// this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
//// this.subInitialValue.set(new ArrayDeque<Subscription>(8));
// }
// }
//
// subs.add(subscription);
//
// if (acceptsSubtypes) {
// // race conditions will result in duplicate answers, which we don't care about
// setupSuperClassCache(clazz);
// }
// }
// else {
// }
// }
// }
// }
} }
public final void unsubscribe(Object listener) { public final void unsubscribe(Object listener) {
@ -319,70 +254,19 @@ public class SubscriptionManager {
return; return;
} }
// these are a concurrent collection // these are concurrent collections
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass); this.superClassSubscriptions.clear();
if (subscriptions != null) { this.varArgSubscriptions.clear();
// we could lock later, in the loop (where it actually needs to be locked), but doing so slows it down (as per benchmarking) this.varArgSuperClassSubscriptions.clear();
this.lock.lock();
try {
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Map<Class<?>, Collection<Subscription>> localSingle = this.subscriptionsPerMessageSingle;
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
synchronized(listenerClass) {
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) { for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener); // this is thread safe, but the following stuff is NOT thread safe. subscription.unsubscribe(listener);
boolean isEmpty = subscription.isEmpty();
if (isEmpty) {
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
Collection<Subscription> subs = localSingle.get(clazz);
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove element
localSingle.remove(clazz);
}
}
} else {
// // NOTE: Not thread-safe! must be synchronized in outer scope
// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
// switch (size) {
// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
// }
//
// if (tree != null) {
// Collection<Subscription> subs = tree.getValue();
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove tree element
// switch (size) {
// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
// }
// }
// }
// }
}
}
} }
} finally {
this.lock.unlock();
} }
} }
} }
@ -585,7 +469,10 @@ public class SubscriptionManager {
Collection<Subscription> subsPerType = null; Collection<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf == null) { if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE); subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
@ -605,7 +492,6 @@ public class SubscriptionManager {
while (iterator1.hasNext()) { while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next(); eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1; boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) { if (type1Matches) {
continue; continue;
} }
@ -653,11 +539,14 @@ public class SubscriptionManager {
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
Collection<Subscription> subsPerType = null; Collection<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf == null) { if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
Collection<Class<?>> types1 = this.superClassesCache.get(superType1); Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2); Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Class<?>> types3 = this.superClassesCache.get(superType3); Collection<Class<?>> types3 = this.superClassesCache.get(superType3);
@ -680,6 +569,9 @@ public class SubscriptionManager {
while (iterator1.hasNext()) { while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next(); eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1; boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) { if (leaf1 != null) {
@ -721,7 +613,7 @@ public class SubscriptionManager {
} }
} }
Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent != null) { if (putIfAbsent != null) {
// someone beat us // someone beat us
subsPerType = putIfAbsent; subsPerType = putIfAbsent;

View File

@ -0,0 +1,24 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionHolder extends ThreadLocal<Collection<Subscription>> {
private final int stripeSize;
private final float loadFactor;
public SubscriptionHolder(float loadFactor, int stripeSize) {
super();
this.stripeSize = stripeSize;
this.loadFactor = loadFactor;
}
@Override
protected Collection<Subscription> initialValue() {
return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor, this.stripeSize);
}
}

View File

@ -92,7 +92,7 @@ public class Subscription {
return this.listeners.size(); return this.listeners.size();
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
Collection<Object> listeners = this.listeners; Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
@ -138,10 +138,12 @@ public class Subscription {
.setPublishedObject(message)); .setPublishedObject(message));
} }
} }
return true;
} }
return false;
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
Collection<Object> listeners = this.listeners; Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
@ -191,10 +193,12 @@ public class Subscription {
.setPublishedObject(message1, message2)); .setPublishedObject(message1, message2));
} }
} }
return true;
} }
return false;
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
Collection<Object> listeners = this.listeners; Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
@ -246,7 +250,9 @@ public class Subscription {
.setPublishedObject(message1, message2, message3)); .setPublishedObject(message1, message2, message3));
} }
} }
return true;
} }
return false;
} }

View File

@ -150,7 +150,7 @@ public class MetadataReaderTest extends AssertSupport {
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class MessageListener1 { public class MessageListener1 {
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handleObject(Object o) { public void handleObject(Object o) {
} }
@ -181,7 +181,7 @@ public class MetadataReaderTest extends AssertSupport {
// narrow the handler // narrow the handler
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handleAny(Object o) { public void handleAny(Object o) {
} }

View File

@ -24,7 +24,7 @@ public class MultiMessageTest extends MessageBusTest {
Listener listener1 = new Listener(); Listener listener1 = new Listener();
bus.subscribe(listener1); bus.subscribe(listener1);
// bus.unsubscribe(listener1); bus.unsubscribe(listener1);
bus.publish("s"); bus.publish("s");
bus.publish("s", "s"); bus.publish("s", "s");
@ -42,7 +42,7 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish(1, 2, "s"); bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(10, count.get()); assertEquals(12, count.get());
bus.shutdown(); bus.shutdown();
@ -50,11 +50,11 @@ public class MultiMessageTest extends MessageBusTest {
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class Listener { public static class Listener {
// @Handler @Handler
// public void handleSync(String o1) { public void handleSync(String o1) {
// count.getAndIncrement(); count.getAndIncrement();
// System.err.println("match String"); System.err.println("match String");
// } }
@Handler @Handler
public void handleSync(String o1, String o2) { public void handleSync(String o1, String o2) {
@ -62,18 +62,18 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match String, String"); System.err.println("match String, String");
} }
// @Handler @Handler
// public void handleSync(String o1, String o2, String o3) { public void handleSync(String o1, String o2, String o3) {
// count.getAndIncrement(); count.getAndIncrement();
// System.err.println("match String, String, String"); System.err.println("match String, String, String");
// } }
//
// @Handler @Handler
// public void handleSync(Integer o1, Integer o2, String o3) { public void handleSync(Integer o1, Integer o2, String o3) {
// count.getAndIncrement(); count.getAndIncrement();
// System.err.println("match Integer, Integer, String"); System.err.println("match Integer, Integer, String");
// } }
//
@Handler(acceptVarargs=true) @Handler(acceptVarargs=true)
public void handleSync(String... o) { public void handleSync(String... o) {
count.getAndIncrement(); count.getAndIncrement();

View File

@ -30,7 +30,7 @@ public class AbstractMessageListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(AbstractMessage message){ public void handle(AbstractMessage message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class ICountableListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(ICountable message){ public void handle(ICountable message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class IMessageListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(IMessage message){ public void handle(IMessage message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class IMultipartMessageListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(IMultipartMessage message){ public void handle(IMultipartMessage message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class MessagesListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(MessageTypes message){ public void handle(MessageTypes message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class MultipartMessageListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(MultipartMessage message){ public void handle(MultipartMessage message){
super.handle(message); super.handle(message);
} }

View File

@ -30,7 +30,7 @@ public class StandardMessageListener {
public static class NoSubtypesListener extends BaseListener { public static class NoSubtypesListener extends BaseListener {
@Override @Override
@Handler(acceptSubtypes = true) @Handler(acceptSubtypes = false)
public void handle(StandardMessage message){ public void handle(StandardMessage message){
super.handle(message); super.handle(message);
} }