From 45614e8bdd39b5de00175ff01d4ea4773aacb4df Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 27 Feb 2015 16:32:05 +0100 Subject: [PATCH] WIP cleanup, unit tests --- .../util/messagebus/MultiMBassador.java | 53 +-- .../util/messagebus/SubscriptionManager.java | 360 ++++++------------ .../messagebus/common/SubscriptionHolder.java | 24 ++ .../messagebus/subscription/Subscription.java | 12 +- .../util/messagebus/MetadataReaderTest.java | 4 +- .../util/messagebus/MultiMessageTest.java | 38 +- .../listeners/AbstractMessageListener.java | 2 +- .../listeners/ICountableListener.java | 2 +- .../listeners/IMessageListener.java | 2 +- .../listeners/IMultipartMessageListener.java | 2 +- .../listeners/MessagesListener.java | 2 +- .../listeners/MultipartMessageListener.java | 2 +- .../listeners/StandardMessageListener.java | 2 +- 13 files changed, 204 insertions(+), 301 deletions(-) create mode 100644 src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index e930009..2e74c03 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -174,15 +174,13 @@ public class MultiMBassador implements IMessageBus { Class messageClass = message.getClass(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean publishDeadSubs = true; + boolean subsPublished = false; // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { - publishDeadSubs = false; - for (Subscription sub : subscriptions) { // this catches all exception types - sub.publishToSubscription(this, message); + subsPublished |= sub.publishToSubscription(this, message); } } @@ -190,11 +188,9 @@ public class MultiMBassador implements IMessageBus { Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses if (superSubscriptions != null && !superSubscriptions.isEmpty()) { - publishDeadSubs = false; - for (Subscription sub : superSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, message); + subsPublished |= sub.publishToSubscription(this, message); } } @@ -205,22 +201,18 @@ public class MultiMBassador implements IMessageBus { Collection varargSubscriptions = manager.getVarArgSubscriptions(messageClass); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { - publishDeadSubs = false; - asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message; for (Subscription sub : varargSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } Collection varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { - publishDeadSubs = false; - if (asArray == null) { asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message; @@ -228,13 +220,13 @@ public class MultiMBassador implements IMessageBus { for (Subscription sub : varargSuperSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } } } - if (publishDeadSubs) { + if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { @@ -255,15 +247,14 @@ public class MultiMBassador implements IMessageBus { Class messageClass2 = message2.getClass(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); - boolean publishDeadSubs = true; + boolean subsPublished = false; // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { - publishDeadSubs = false; for (Subscription sub : subscriptions) { // this catches all exception types - sub.publishToSubscription(this, message1, message2); + subsPublished |= sub.publishToSubscription(this, message1, message2); } } @@ -271,10 +262,9 @@ public class MultiMBassador implements IMessageBus { Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // now get superClasses if (superSubscriptions != null && !superSubscriptions.isEmpty()) { - publishDeadSubs = false; for (Subscription sub : superSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, message1, message2); + subsPublished |= sub.publishToSubscription(this, message1, message2); } } @@ -284,23 +274,19 @@ public class MultiMBassador implements IMessageBus { Collection varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { - publishDeadSubs = false; - asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray[0] = message1; asArray[1] = message2; for (Subscription sub : varargSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } Collection varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { - publishDeadSubs = false; - if (asArray == null) { asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray[0] = message1; @@ -309,14 +295,14 @@ public class MultiMBassador implements IMessageBus { for (Subscription sub : varargSuperSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } } } - if (publishDeadSubs) { + if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { @@ -338,14 +324,13 @@ public class MultiMBassador implements IMessageBus { Class messageClass3 = message3.getClass(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); - boolean publishDeadSubs = true; + boolean subsPublished = false; // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { - publishDeadSubs = false; for (Subscription sub : subscriptions) { // this catches all exception types - sub.publishToSubscription(this, message1, message2, message3); + subsPublished |= sub.publishToSubscription(this, message1, message2, message3); } } @@ -365,8 +350,6 @@ public class MultiMBassador implements IMessageBus { Object[] asArray = null; Collection varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { - publishDeadSubs = false; - asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray[0] = message1; asArray[1] = message2; @@ -374,15 +357,13 @@ public class MultiMBassador implements IMessageBus { for (Subscription sub : varargSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } Collection varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { - publishDeadSubs = false; - if (asArray == null) { asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray[0] = message1; @@ -392,14 +373,14 @@ public class MultiMBassador implements IMessageBus { for (Subscription sub : varargSuperSubscriptions) { // this catches all exception types - sub.publishToSubscription(this, asArray); + subsPublished |= sub.publishToSubscription(this, asArray); } } } } - if (publishDeadSubs) { + if (!subsPublished) { // Dead Event must EXACTLY MATCH (no subclasses) Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 32e6906..377f1ed 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -5,13 +5,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentMap; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.ReflectionUtils; import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import dorkbox.util.messagebus.common.SubscriptionHolder; import dorkbox.util.messagebus.common.SuperClassIterator; import dorkbox.util.messagebus.listener.MessageHandler; import dorkbox.util.messagebus.listener.MetadataReader; @@ -51,18 +52,14 @@ public class SubscriptionManager { // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, Collection> subscriptionsPerMessageSingle; + private final ConcurrentMap, Collection> subscriptionsPerMessageSingle; private final HashMapTree, Collection> subscriptionsPerMessageMulti; - // synchronize read/write access to the subscription maps - private final ReentrantLock lock = new ReentrantLock(); - - // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change - private final Map, Collection> subscriptionsPerListener; + private final ConcurrentMap, Collection> subscriptionsPerListener; private final Map, Class> arrayVersionCache; private final Map, Collection>> superClassesCache; @@ -79,6 +76,9 @@ public class SubscriptionManager { // stripe size of maps for concurrency private final int STRIPE_SIZE; + private final SubscriptionHolder subHolder; + private final SubscriptionHolder subHolderConcurrent; + SubscriptionManager(int numberOfThreads) { this.STRIPE_SIZE = numberOfThreads; @@ -109,23 +109,24 @@ public class SubscriptionManager { this.varArgSubscriptions = new ConcurrentHashMapV8, Collection>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, Collection>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); } + + this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); + this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); } public void shutdown() { - this.lock.lock(); - try { - this.nonListeners.clear(); - this.subscriptionsPerMessageSingle.clear(); - this.subscriptionsPerMessageMulti.clear(); - this.subscriptionsPerListener.clear(); - this.arrayVersionCache.clear(); - this.superClassesCache.clear(); - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); - } finally { - this.lock.unlock(); - } + this.nonListeners.clear(); + + this.subscriptionsPerMessageSingle.clear(); + this.subscriptionsPerMessageMulti.clear(); + this.subscriptionsPerListener.clear(); + + this.superClassesCache.clear(); + this.superClassSubscriptions.clear(); + + this.arrayVersionCache.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); } public void subscribe(Object listener) { @@ -140,172 +141,106 @@ public class SubscriptionManager { return; } + // these are concurrent collections + this.superClassSubscriptions.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); - Map, Collection> subsPerListener = this.subscriptionsPerListener; - Collection subscriptions = subsPerListener.get(listenerClass); - if (subscriptions == null) { - // we could lock later, in the loop (where it actually needs to be locked), but doing so slows it down (as per benchmarking) - this.lock.lock(); - try { + + // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. + // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency + synchronized(listenerClass) { + ConcurrentMap, Collection> subsPerListener2 = this.subscriptionsPerListener; + Collection subsPerListener = subsPerListener2.get(listenerClass); + if (subsPerListener == null) { // a listener is subscribed for the first time - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); - - Collection messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); int handlersSize = messageHandlers.size(); if (handlersSize == 0) { // remember the class as non listening class if no handlers are found - this.nonListeners.put(listener.getClass(), Boolean.TRUE); + this.nonListeners.put(listenerClass, Boolean.TRUE); return; } else { - subscriptions = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1); - Map, Collection> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + // really was null + subsPerListener = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1); + ConcurrentMap, Collection> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + - // create NEW subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { + Collection subsPerType = null; + + // now add this subscription to each of the handled types + Class[] types = messageHandler.getHandledMessages(); + int size = types.length; + switch (size) { + case 1: { + Collection putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get()); + if (putIfAbsent != null) { + subsPerType = putIfAbsent; + } else { + subsPerType = this.subHolderConcurrent.get(); + this.subHolderConcurrent.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE)); + getSuperClass(types[0]); + } + break; + } + case 2: { + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); + if (putIfAbsent != null) { + subsPerType = putIfAbsent; + } else { + subsPerType = this.subHolder.get(); + this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + getSuperClass(types[0]); + getSuperClass(types[1]); + } + break; + } + case 3: { + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); + if (putIfAbsent != null) { + subsPerType = putIfAbsent; + } else { + subsPerType = this.subHolder.get(); + this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + getSuperClass(types[0]); + getSuperClass(types[1]); + getSuperClass(types[2]); + } + break; + } + default: { + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); + if (putIfAbsent != null) { + subsPerType = putIfAbsent; + } else { + subsPerType = this.subHolder.get(); + this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + for (Class c : types) { + getSuperClass(c); + } + } + break; + } + } + // create the subscription Subscription subscription = new Subscription(messageHandler); subscription.subscribe(listener); - subscriptions.add(subscription); - - // now add this subscription to each of the handled types - Class[] handledMessageTypes = subscription.getHandledMessageTypes(); - int size = handledMessageTypes.length; - if (size == 1) { - // single - Class clazz = handledMessageTypes[0]; - Collection subs = subsPerMessageSingle.get(clazz); - if (subs == null || subs.isEmpty()) { - subs = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1); - subsPerMessageSingle.put(clazz, subs); - } - - subs.add(subscription); - getSuperClass(clazz); - } else { - // multiversion - HashMapTree, Collection> tree; - - switch (size) { - case 2: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); - getSuperClass(handledMessageTypes[0]); - getSuperClass(handledMessageTypes[1]); - break; - } - case 3: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); - getSuperClass(handledMessageTypes[0]); - getSuperClass(handledMessageTypes[1]); - getSuperClass(handledMessageTypes[2]); - break; - } - default: { - tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); - for (Class c : handledMessageTypes) { - getSuperClass(c); - } - break; - } - } - - Collection subs = tree.getValue(); - if (subs == null) { - subs = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1); - tree.putValue(subs); - } - subs.add(subscription); - } + subsPerListener.add(subscription); + subsPerType.add(subscription); } - // order is critical for safe publication - subsPerListener.put(listenerClass, subscriptions); + subsPerListener2.put(listenerClass, subsPerListener); + } + } else { + // subscriptions already exist and must only be updated + for (Subscription subscription : subsPerListener) { + subscription.subscribe(listener); } - } finally { - this.lock.unlock(); - } - } else { - // subscriptions already exist and must only be updated - for (Subscription subscription : subscriptions) { - subscription.subscribe(listener); } } - - -// -// if (subscriptions != null) { -// // subscriptions already exist and must only be updated -// for (Subscription subscription : subscriptions) { -// subscription.subscribe(listener); -// } -// } -// else { -// // a listener is subscribed for the first time -// Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); -// int handlersSize = messageHandlers.size(); -// -// if (handlersSize == 0) { -// // remember the class as non listening class if no handlers are found -// this.nonListeners.put(listenerClass, this.holder); -// } else { -// subscriptions = new StrongConcurrentSet(handlersSize, this.LOAD_FACTOR); -//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, this.MAP_STRIPING)); -//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1)); -//// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR)); -// this.subscriptionsPerListener.put(listenerClass, subscriptions); -// -// resetSuperClassSubs(); -// -// // create NEW subscriptions for all detected message handlers -// for (MessageHandler messageHandler : messageHandlers) { -// // create the subscription -// Subscription subscription = new Subscription(messageHandler); -// subscription.subscribe(listener); -// -// subscriptions.add(subscription); -// -// // -// // save the subscription per message type -// // -// // single or multi? -// Class[] handledMessageTypes = subscription.getHandledMessageTypes(); -// int size = handledMessageTypes.length; -// boolean acceptsSubtypes = subscription.acceptsSubtypes(); -// -// if (size == 1) { -// // single -// Class clazz = handledMessageTypes[0]; -// -// Collection subs = this.subscriptionsPerMessageSingle.get(clazz); -// if (subs == null) { -// Collection putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get()); -// if (putIfAbsent != null) { -// subs = putIfAbsent; -// } else { -// subs = this.subInitialValue.get(); -//// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1))); -//// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR))); -// this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); -//// this.subInitialValue.set(new ArrayDeque(8)); -// } -// } -// -// subs.add(subscription); -// -// if (acceptsSubtypes) { -// // race conditions will result in duplicate answers, which we don't care about -// setupSuperClassCache(clazz); -// } -// } -// else { - -// } -// } -// } -// } } public final void unsubscribe(Object listener) { @@ -319,70 +254,19 @@ public class SubscriptionManager { return; } - // these are a concurrent collection - Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); - if (subscriptions != null) { - // we could lock later, in the loop (where it actually needs to be locked), but doing so slows it down (as per benchmarking) - this.lock.lock(); - try { - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); - - Map, Collection> localSingle = this.subscriptionsPerMessageSingle; + // these are concurrent collections + this.superClassSubscriptions.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); + // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. + // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency + synchronized(listenerClass) { + Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); + if (subscriptions != null) { for (Subscription subscription : subscriptions) { - subscription.unsubscribe(listener); // this is thread safe, but the following stuff is NOT thread safe. - - boolean isEmpty = subscription.isEmpty(); - - if (isEmpty) { - // single or multi? - Class[] handledMessageTypes = subscription.getHandledMessageTypes(); - int size = handledMessageTypes.length; - if (size == 1) { - // single - Class clazz = handledMessageTypes[0]; - - Collection subs = localSingle.get(clazz); - if (subs != null) { - subs.remove(subscription); - - if (subs.isEmpty()) { - // remove element - localSingle.remove(clazz); - } - } - } else { -// // NOTE: Not thread-safe! must be synchronized in outer scope -// IdentityObjectTree, Collection> tree; - // -// switch (size) { -// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break; -// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; -// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break; -// } - // -// if (tree != null) { -// Collection subs = tree.getValue(); -// if (subs != null) { -// subs.remove(subscription); - // -// if (subs.isEmpty()) { -// // remove tree element -// switch (size) { -// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break; -// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; -// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break; -// } -// } -// } -// } - } - } + subscription.unsubscribe(listener); } - } finally { - this.lock.unlock(); } } } @@ -585,7 +469,10 @@ public class SubscriptionManager { Collection subsPerType = null; // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf == null) { + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { subsPerType = new StrongConcurrentSetV8(16, LOAD_FACTOR, this.STRIPE_SIZE); // whenever our subscriptions change, this map is cleared. @@ -605,7 +492,6 @@ public class SubscriptionManager { while (iterator1.hasNext()) { eventSuperType1 = iterator1.next(); boolean type1Matches = eventSuperType1 == superType1; - if (type1Matches) { continue; } @@ -653,11 +539,14 @@ public class SubscriptionManager { // whenever our subscriptions change, this map is cleared. HashMapTree, Collection> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); - Collection subsPerType = null; + Collection subsPerType; // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf == null) { + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { Collection> types1 = this.superClassesCache.get(superType1); Collection> types2 = this.superClassesCache.get(superType2); Collection> types3 = this.superClassesCache.get(superType3); @@ -680,6 +569,9 @@ public class SubscriptionManager { while (iterator1.hasNext()) { eventSuperType1 = iterator1.next(); boolean type1Matches = eventSuperType1 == superType1; + if (type1Matches) { + continue; + } leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); if (leaf1 != null) { @@ -721,7 +613,7 @@ public class SubscriptionManager { } } - Collection putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); + Collection putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); if (putIfAbsent != null) { // someone beat us subsPerType = putIfAbsent; diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java new file mode 100644 index 0000000..d7f4ed3 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java @@ -0,0 +1,24 @@ +package dorkbox.util.messagebus.common; + +import java.util.Collection; + +import dorkbox.util.messagebus.subscription.Subscription; + +public class SubscriptionHolder extends ThreadLocal> { + + private final int stripeSize; + private final float loadFactor; + + public SubscriptionHolder(float loadFactor, int stripeSize) { + super(); + + this.stripeSize = stripeSize; + this.loadFactor = loadFactor; + } + + @Override + protected Collection initialValue() { + return new StrongConcurrentSetV8(16, this.loadFactor, this.stripeSize); + } +} + diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 69f8ecf..abd932c 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -92,7 +92,7 @@ public class Subscription { return this.listeners.size(); } - public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { + public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { Collection listeners = this.listeners; if (!listeners.isEmpty()) { @@ -138,10 +138,12 @@ public class Subscription { .setPublishedObject(message)); } } + return true; } + return false; } - public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { + public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { Collection listeners = this.listeners; if (!listeners.isEmpty()) { @@ -191,10 +193,12 @@ public class Subscription { .setPublishedObject(message1, message2)); } } + return true; } + return false; } - public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { + public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { Collection listeners = this.listeners; if (!listeners.isEmpty()) { @@ -246,7 +250,9 @@ public class Subscription { .setPublishedObject(message1, message2, message3)); } } + return true; } + return false; } diff --git a/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java b/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java index 841be19..46d9255 100644 --- a/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java +++ b/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java @@ -150,7 +150,7 @@ public class MetadataReaderTest extends AssertSupport { @SuppressWarnings("unused") public class MessageListener1 { - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handleObject(Object o) { } @@ -181,7 +181,7 @@ public class MetadataReaderTest extends AssertSupport { // narrow the handler @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handleAny(Object o) { } diff --git a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java index 1bafc31..1ba5d9b 100644 --- a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java +++ b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java @@ -24,7 +24,7 @@ public class MultiMessageTest extends MessageBusTest { Listener listener1 = new Listener(); bus.subscribe(listener1); -// bus.unsubscribe(listener1); + bus.unsubscribe(listener1); bus.publish("s"); bus.publish("s", "s"); @@ -42,7 +42,7 @@ public class MultiMessageTest extends MessageBusTest { bus.publish(1, 2, "s"); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); - assertEquals(10, count.get()); + assertEquals(12, count.get()); bus.shutdown(); @@ -50,11 +50,11 @@ public class MultiMessageTest extends MessageBusTest { @SuppressWarnings("unused") public static class Listener { -// @Handler -// public void handleSync(String o1) { -// count.getAndIncrement(); -// System.err.println("match String"); -// } + @Handler + public void handleSync(String o1) { + count.getAndIncrement(); + System.err.println("match String"); + } @Handler public void handleSync(String o1, String o2) { @@ -62,18 +62,18 @@ public class MultiMessageTest extends MessageBusTest { System.err.println("match String, String"); } -// @Handler -// public void handleSync(String o1, String o2, String o3) { -// count.getAndIncrement(); -// System.err.println("match String, String, String"); -// } -// -// @Handler -// public void handleSync(Integer o1, Integer o2, String o3) { -// count.getAndIncrement(); -// System.err.println("match Integer, Integer, String"); -// } -// + @Handler + public void handleSync(String o1, String o2, String o3) { + count.getAndIncrement(); + System.err.println("match String, String, String"); + } + + @Handler + public void handleSync(Integer o1, Integer o2, String o3) { + count.getAndIncrement(); + System.err.println("match Integer, Integer, String"); + } + @Handler(acceptVarargs=true) public void handleSync(String... o) { count.getAndIncrement(); diff --git a/src/test/java/dorkbox/util/messagebus/listeners/AbstractMessageListener.java b/src/test/java/dorkbox/util/messagebus/listeners/AbstractMessageListener.java index 64ccc05..5b969df 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/AbstractMessageListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/AbstractMessageListener.java @@ -30,7 +30,7 @@ public class AbstractMessageListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(AbstractMessage message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/ICountableListener.java b/src/test/java/dorkbox/util/messagebus/listeners/ICountableListener.java index 89fd024..5708520 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/ICountableListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/ICountableListener.java @@ -30,7 +30,7 @@ public class ICountableListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(ICountable message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/IMessageListener.java b/src/test/java/dorkbox/util/messagebus/listeners/IMessageListener.java index 2dff149..7af1908 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/IMessageListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/IMessageListener.java @@ -30,7 +30,7 @@ public class IMessageListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(IMessage message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/IMultipartMessageListener.java b/src/test/java/dorkbox/util/messagebus/listeners/IMultipartMessageListener.java index e259609..0501f60 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/IMultipartMessageListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/IMultipartMessageListener.java @@ -30,7 +30,7 @@ public class IMultipartMessageListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(IMultipartMessage message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/MessagesListener.java b/src/test/java/dorkbox/util/messagebus/listeners/MessagesListener.java index 74769da..d2e387a 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/MessagesListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/MessagesListener.java @@ -30,7 +30,7 @@ public class MessagesListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(MessageTypes message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/MultipartMessageListener.java b/src/test/java/dorkbox/util/messagebus/listeners/MultipartMessageListener.java index bf13f76..d5704d1 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/MultipartMessageListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/MultipartMessageListener.java @@ -30,7 +30,7 @@ public class MultipartMessageListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(MultipartMessage message){ super.handle(message); } diff --git a/src/test/java/dorkbox/util/messagebus/listeners/StandardMessageListener.java b/src/test/java/dorkbox/util/messagebus/listeners/StandardMessageListener.java index 70b7775..d9c6980 100644 --- a/src/test/java/dorkbox/util/messagebus/listeners/StandardMessageListener.java +++ b/src/test/java/dorkbox/util/messagebus/listeners/StandardMessageListener.java @@ -30,7 +30,7 @@ public class StandardMessageListener { public static class NoSubtypesListener extends BaseListener { @Override - @Handler(acceptSubtypes = true) + @Handler(acceptSubtypes = false) public void handle(StandardMessage message){ super.handle(message); }