diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 10634a9..0f3bfa8 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -2,7 +2,6 @@ package dorkbox.util.messagebus; import java.util.ArrayDeque; import java.util.Collection; -import java.util.Iterator; import org.jctools.util.Pow2; @@ -193,29 +192,46 @@ public class MultiMBassador implements IMessageBus { @Override public void publish(final Object message) { try { + boolean subsPublished = false; SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null - boolean subsPublished = false; - - Iterator iterator; + Subscription[] subscriptions; Subscription sub; + subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null // Run subscriptions - if (subscriptions != null && !subscriptions.isEmpty()) { - for (iterator = subscriptions.iterator(); iterator.hasNext();) { - sub = iterator.next(); + if (subscriptions != null) { + int length = subscriptions.length; + if (length > 0) { + for (int i=0;i superSubscriptions = manager.getSuperSubscriptions(messageClass); + + if (!this.forceExactMatches) { +// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null // // now get superClasses +// int length = superSubscriptions.length; +// +// if (length > 0) { +// for (int i=0;i deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { - DeadMessage deadMessage = new DeadMessage(message); + Subscription[] deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + if (deadSubscriptions != null) { + int length = deadSubscriptions.length; + if (length > 0) { + DeadMessage deadMessage = new DeadMessage(message); - for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) { - sub = iterator.next(); + for (int i=0;i, Collection> subscriptionsPerMessageSingle; - private final HashMapTree, Collection> subscriptionsPerMessageMulti; + private final Map, ArrayList> subscriptionsPerMessageSingle; + private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing @@ -70,6 +71,7 @@ public class SubscriptionManager { private final StampedLock lock = new StampedLock(); +// private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final SubscriptionHolder subHolderSingle; private final SubscriptionHolder subHolderConcurrent; @@ -84,8 +86,8 @@ public class SubscriptionManager { { this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, loadFactor, this.STRIPE_SIZE); - this.subscriptionsPerMessageSingle = new HashMap, Collection>(64); - this.subscriptionsPerMessageMulti = new HashMapTree, Collection>(4, loadFactor); + this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, ArrayList>(64, LOAD_FACTOR, 1); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, loadFactor); // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMapV8, ConcurrentSet>(4, loadFactor, this.STRIPE_SIZE); @@ -97,8 +99,8 @@ public class SubscriptionManager { // it's a hit on SUB/UNSUB, but improves performance of handlers this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE); - this.subHolderSingle = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE); - this.subHolderConcurrent = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE); + this.subHolderSingle = new SubscriptionHolder(); + this.subHolderConcurrent = new SubscriptionHolder(); } public void shutdown() { @@ -138,7 +140,10 @@ public class SubscriptionManager { // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, // but for concurrency because there are race conditions here if we don't. - long stamp = this.lock.writeLock(); + StampedLock lock = this.lock; + long stamp = lock.writeLock(); +// Lock writeLock = this.lock.writeLock(); +// writeLock.lock(); subsPerListener = subsPerListener2.get(listenerClass); @@ -153,15 +158,16 @@ public class SubscriptionManager { } this.lock.unlockWrite(stamp); +// writeLock.unlock(); return; } // subsPerListener == null, so we now enter an exclusive write lock and double check. -// long origStamp = stamp; -// if ((stamp = this.lock.tryConvertToWriteLock(stamp)) == 0) { -// this.lock.unlockRead(origStamp); -// stamp = this.lock.writeLock(); -// } + long origStamp = stamp; + if ((stamp = this.lock.tryConvertToWriteLock(stamp)) == 0) { + this.lock.unlockRead(origStamp); + stamp = this.lock.writeLock(); + } subsPerListener = subsPerListener2.get(listenerClass); @@ -176,6 +182,7 @@ public class SubscriptionManager { } this.lock.unlockWrite(stamp); +// writeLock.unlock(); return; } @@ -188,13 +195,14 @@ public class SubscriptionManager { // remember the class as non listening class if no handlers are found this.nonListeners.put(listenerClass, Boolean.TRUE); this.lock.unlockWrite(stamp); +// writeLock.unlock(); return; } else { subsPerListener = new ConcurrentSet(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); VarArgPossibility varArgPossibility = this.varArgPossibility; - Map, Collection> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - HashMapTree, Collection> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; Iterator iterator; @@ -218,33 +226,31 @@ public class SubscriptionManager { subsPerListener2.put(listenerClass, subsPerListener); + this.lock.unlockWrite(stamp); +// writeLock.unlock(); } } // inside a write lock private final Collection getSubsForPublication(MessageHandler messageHandler, - Map, Collection> subsPerMessageSingle, - HashMapTree, Collection> subsPerMessageMulti, + Map, ArrayList> subsPerMessageSingle, + HashMapTree, ArrayList> subsPerMessageMulti, VarArgPossibility varArgPossibility) { Class[] types = messageHandler.getHandledMessages(); int size = types.length; - ConcurrentSet subsPerType; +// ConcurrentSet subsPerType; SubscriptionUtils utils = this.utils; Class type0 = types[0]; switch (size) { case 1: { - Collection subs = subsPerMessageSingle.get(type0); + ArrayList subs = subsPerMessageSingle.get(type0); if (subs == null) { -// subs = new ConcurrentSet<>(16, LOAD_FACTOR, this.STRIPE_SIZE); - subs = new ArrayList<>(16); -// subs = new ConcurrentLinkedQueue2(); -// subs = new LinkedList(); - + subs = new ArrayList<>(8); boolean isArray = utils.isArray(type0); if (isArray) { @@ -252,7 +258,7 @@ public class SubscriptionManager { } // cache the super classes - utils.getSuperClasses(type0, isArray); + utils.cacheSuperClasses(type0, isArray); subsPerMessageSingle.put(type0, subs); } @@ -261,63 +267,64 @@ public class SubscriptionManager { } case 2: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); - if (putIfAbsent != null) { - return putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - - // cache the super classes - utils.getSuperClasses(type0); - utils.getSuperClasses(types[1]); - - return subsPerType; - } +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.get(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// // cache the super classes +// utils.cacheSuperClasses(type0); +// utils.cacheSuperClasses(types[1]); +// +// return subsPerType; +// } } case 3: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); - if (putIfAbsent != null) { - return putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - - // cache the super classes - utils.getSuperClasses(type0); - utils.getSuperClasses(types[1]); - utils.getSuperClasses(types[2]); - - return subsPerType; - } +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.get(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// // cache the super classes +// utils.cacheSuperClasses(type0); +// utils.cacheSuperClasses(types[1]); +// utils.cacheSuperClasses(types[2]); +// +// return subsPerType; +// } } default: { // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); - if (putIfAbsent != null) { - return putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - - Class c; - int length = types.length; - for (int i = 0; i < length; i++) { - c = types[i]; - - // cache the super classes - utils.getSuperClasses(c); - } - - return subsPerType; - } +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.get(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// Class c; +// int length = types.length; +// for (int i = 0; i < length; i++) { +// c = types[i]; +// +// // cache the super classes +// utils.cacheSuperClasses(c); +// } +// +// return subsPerType; +// } + return null; } } } @@ -334,9 +341,11 @@ public class SubscriptionManager { } // these are concurrent collections - clearConcurrentCollections(); +// clearConcurrentCollections(); long stamp = this.lock.writeLock(); +// Lock writeLock = this.lock.writeLock(); +// writeLock.lock(); Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { @@ -351,6 +360,7 @@ public class SubscriptionManager { } this.lock.unlockWrite(stamp); +// writeLock.unlock(); } private void clearConcurrentCollections() { @@ -359,39 +369,55 @@ public class SubscriptionManager { } // CAN RETURN NULL - public final Collection getSubscriptionsByMessageType(Class messageType) { + public final Subscription[] getSubscriptionsByMessageType(Class messageType) { Collection collection; - Collection subscriptions = null; + Subscription[] subscriptions = null; - long stamp = this.lock.tryOptimisticRead(); // non blocking + long stamp = this.lock.readLock(); +// Lock writeLock = this.lock.readLock(); +// writeLock.lock(); - collection = this.subscriptionsPerMessageSingle.get(messageType); - if (collection != null) { -// subscriptions = new ArrayDeque<>(collection); - subscriptions = new ArrayList<>(collection); -// subscriptions = new LinkedList<>(); -// subscriptions = new TreeSet(SubscriptionByPriorityDesc); - -// subscriptions.addAll(collection); - } - - if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock - stamp = this.lock.readLock(); - try { - collection = this.subscriptionsPerMessageSingle.get(messageType); - if (collection != null) { -// subscriptions = new ArrayDeque<>(collection); - subscriptions = new ArrayList<>(collection); -// subscriptions = new LinkedList<>(); -// subscriptions = new TreeSet(SubscriptionByPriorityDesc); - -// subscriptions.addAll(collection); - } - } - finally { - this.lock.unlockRead(stamp); + try { + collection = this.subscriptionsPerMessageSingle.get(messageType); + if (collection != null) { + subscriptions = collection.toArray(EMPTY); } } + finally { + this.lock.unlockRead(stamp); +// writeLock.unlock(); + } + + +// long stamp = this.lock.tryOptimisticRead(); // non blocking +// +// collection = this.subscriptionsPerMessageSingle.get(messageType); +// if (collection != null) { +//// subscriptions = new ArrayDeque<>(collection); +// subscriptions = new ArrayList<>(collection); +//// subscriptions = new LinkedList<>(); +//// subscriptions = new TreeSet(SubscriptionByPriorityDesc); +// +//// subscriptions.addAll(collection); +// } +// +// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock +// stamp = this.lock.readLock(); +// try { +// collection = this.subscriptionsPerMessageSingle.get(messageType); +// if (collection != null) { +//// subscriptions = new ArrayDeque<>(collection); +// subscriptions = new ArrayList<>(collection); +//// subscriptions = new LinkedList<>(); +//// subscriptions = new TreeSet(SubscriptionByPriorityDesc); +// +//// subscriptions.addAll(collection); +// } +// } +// finally { +// this.lock.unlockRead(stamp); +// } +// } return subscriptions; } @@ -459,7 +485,7 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public final Collection getSuperSubscriptions(Class superType) { + public final Subscription[] getSuperSubscriptions(Class superType) { return this.utils.getSuperSubscriptions(superType); } diff --git a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java index fb40131..e4f4100 100644 --- a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java @@ -58,6 +58,7 @@ public abstract class AbstractConcurrentSet extends pad implements Set stamp = this.lock.writeLock(); } + changed = insert(element); this.lock.unlock(stamp); diff --git a/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java b/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java index 4a75af1..278c317 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java +++ b/src/main/java/dorkbox/util/messagebus/common/ConcurrentHashMapV8.java @@ -4346,4 +4346,4 @@ final Node find(int h, Object k) { e.getCause()); } } -} \ No newline at end of file +} diff --git a/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java b/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java index 0a0f3bc..af7f2bd 100644 --- a/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java +++ b/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java @@ -13,4 +13,9 @@ public class StrongConcurrentSetV8 extends StrongConcurrentSet { // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks) super(new ConcurrentHashMapV8>(size, loadFactor, 1)); } + + public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) { + // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks) + super(new ConcurrentHashMapV8>(size, loadFactor, stripeSize)); + } } \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java index b8a22f8..d0864be 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java @@ -1,6 +1,7 @@ package dorkbox.util.messagebus.common; import java.lang.reflect.Array; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -8,50 +9,56 @@ import java.util.concurrent.ConcurrentMap; import dorkbox.util.messagebus.common.thread.ClassHolder; import dorkbox.util.messagebus.common.thread.ConcurrentSet; +import dorkbox.util.messagebus.common.thread.StampedLock; import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.subscription.Subscription; public class SubscriptionUtils { + private static final Class[] SUPER_CLASS_EMPTY = new Class[0]; + + private StampedLock superClassLock = new StampedLock(); + private final Map, Class> arrayVersionCache; private final Map, Boolean> isArrayCache; - private final ConcurrentMap, ConcurrentSet>> superClassesCache; + private final ConcurrentMap, ArrayList>> superClassesCache; private final ClassHolder classHolderSingle; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private final ConcurrentMap, ConcurrentSet> superClassSubscriptions; + private final ConcurrentMap, ArrayList> superClassSubscriptions; private final HashMapTree, ConcurrentSet> superClassSubscriptionsMulti; - private final Map, Collection> subscriptionsPerMessageSingle; + private final Map, ArrayList> subscriptionsPerMessageSingle; private final SubscriptionHolder subHolderSingle; private final SubscriptionHolder subHolderConcurrent; - private final HashMapTree, Collection> subscriptionsPerMessageMulti; + private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - public SubscriptionUtils(Map, Collection> subscriptionsPerMessageSingle, - HashMapTree, Collection> subscriptionsPerMessageMulti, + public SubscriptionUtils(Map, ArrayList> subscriptionsPerMessageSingle, + HashMapTree, ArrayList> subscriptionsPerMessageMulti, float loadFactor, int stripeSize) { this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti; + this.arrayVersionCache = new ConcurrentHashMapV8, Class>(32, loadFactor, stripeSize); this.isArrayCache = new ConcurrentHashMapV8, Boolean>(32, loadFactor, stripeSize); - this.superClassesCache = new ConcurrentHashMapV8, ConcurrentSet>>(32, loadFactor, stripeSize); + this.superClassesCache = new ConcurrentHashMapV8, ArrayList>>(32, loadFactor, 1); this.classHolderSingle = new ClassHolder(loadFactor, stripeSize); // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(32, loadFactor, stripeSize); + this.superClassSubscriptions = new ConcurrentHashMapV8, ArrayList>(); this.superClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, loadFactor); - this.subHolderSingle = new SubscriptionHolder(loadFactor, stripeSize); - this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize); + this.subHolderSingle = new SubscriptionHolder(); + this.subHolderConcurrent = new SubscriptionHolder(); } public void clear() { @@ -63,43 +70,113 @@ public class SubscriptionUtils { * never returns null * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) */ - public Collection> getSuperClasses(Class clazz) { - return getSuperClasses(clazz, isArray(clazz)); - } - - public final Collection> getSuperClasses(Class clazz, boolean isArray) { + public final Class[] getSuperClasses_NL(Class clazz, boolean isArray) { // this is never reset, since it never needs to be. - ConcurrentMap, ConcurrentSet>> local = this.superClassesCache; + ConcurrentMap, ArrayList>> local = this.superClassesCache; + Class[] classes; - ClassHolder classHolderSingle = this.classHolderSingle; - ConcurrentSet> classes = classHolderSingle.get(); + ArrayList> arrayList = local.get(clazz); - ConcurrentSet> putIfAbsent = local.putIfAbsent(clazz, classes); - if (putIfAbsent == null) { - // we are the first one in the map - classHolderSingle.set(classHolderSingle.initialValue()); - - // it doesn't matter if concurrent access stomps on values, since they are always the same. + if (arrayList != null) { + classes = arrayList.toArray(SUPER_CLASS_EMPTY); + } else { + // get all super types of class Collection> superTypes = ReflectionUtils.getSuperTypes(clazz); + arrayList = new ArrayList>(superTypes.size()); Iterator> iterator; Class c; for (iterator = superTypes.iterator(); iterator.hasNext();) { c = iterator.next(); + if (isArray) { c = getArrayClass(c); } if (c != clazz) { - classes.add(c); + arrayList.add(c); } } - return classes; - } else { - // someone beat us - return putIfAbsent; + local.put(clazz, arrayList); + classes = arrayList.toArray(SUPER_CLASS_EMPTY); } + + return classes; + } + + // called inside sub/unsub write lock + public void cacheSuperClasses(Class clazz) { + // TODO Auto-generated method stub + + } + + // called inside sub/unsub write lock + public void cacheSuperClasses(Class clazz, boolean isArray) { + // TODO Auto-generated method stub + + } + + public final Class[] getSuperClasses(Class clazz, boolean isArray) { + // this is never reset, since it never needs to be. + ConcurrentMap, ArrayList>> local = this.superClassesCache; + Class[] classes; + + StampedLock lock = this.superClassLock; + long stamp = lock.tryOptimisticRead(); + + if (stamp > 0) { + ArrayList> arrayList = local.get(clazz); + if (arrayList != null) { + classes = arrayList.toArray(SUPER_CLASS_EMPTY); + + if (lock.validate(stamp)) { + return classes; + } else { + stamp = lock.readLock(); + + arrayList = local.get(clazz); + if (arrayList != null) { + classes = arrayList.toArray(SUPER_CLASS_EMPTY); + lock.unlockRead(stamp); + return classes; + } + } + } + } + + // unable to get a valid subscription. Have to acquire a write lock + long origStamp = stamp; + if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { + lock.unlockRead(origStamp); + stamp = lock.writeLock(); + } + + + // get all super types of class + Collection> superTypes = ReflectionUtils.getSuperTypes(clazz); + ArrayList> arrayList = new ArrayList>(superTypes.size()); + Iterator> iterator; + Class c; + + for (iterator = superTypes.iterator(); iterator.hasNext();) { + c = iterator.next(); + + if (isArray) { + c = getArrayClass(c); + } + + if (c != clazz) { + arrayList.add(c); + } + } + + local.put(clazz, arrayList); + classes = arrayList.toArray(SUPER_CLASS_EMPTY); + + lock.unlockWrite(stamp); + + return classes; } /** @@ -145,135 +222,192 @@ public class SubscriptionUtils { } - // CAN NOT RETURN NULL - // ALSO checks to see if the superClass accepts subtypes. - public final Collection getSuperSubscriptions(Class superType) { + private static Subscription[] EMPTY = new Subscription[0]; + + private StampedLock superSubLock = new StampedLock(); + + /** + * Returns an array copy of the super subscriptions for the specified type. + * + * This ALSO checks to see if the superClass accepts subtypes. + * + * @return CAN NOT RETURN NULL + */ + public final Subscription[] getSuperSubscriptions(Class superType) { // whenever our subscriptions change, this map is cleared. - ConcurrentMap, ConcurrentSet> local = this.superClassSubscriptions; + ConcurrentMap, ArrayList> local = this.superClassSubscriptions; + Subscription[] subscriptions; +// +// StampedLock lock = this.superSubLock; +// long stamp = lock.tryOptimisticRead(); +// +// if (stamp > 0) { +// ArrayList arrayList = local.get(superType); +// if (arrayList != null) { +// subscriptions = arrayList.toArray(EMPTY); +// +// if (lock.validate(stamp)) { +// return subscriptions; +// } else { +// stamp = lock.readLock(); +// +// arrayList = local.get(superType); +// if (arrayList != null) { +// subscriptions = arrayList.toArray(EMPTY); +// lock.unlockRead(stamp); +// return subscriptions; +// } +// +// // unable to get a valid subscription. Have to acquire a write lock +// long origStamp = stamp; +// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { +// lock.unlock(origStamp); +// stamp = lock.writeLock(); +// } +// } +// } else { +// // unable to get a valid subscription. Have to acquire a write lock +// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { +// stamp = lock.writeLock(); +// } +// } +// } else { +// // unable to get a valid subscription. Have to acquire a write lock +// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { +// stamp = lock.writeLock(); +// } +// } - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - ConcurrentSet subsPerType = subHolderConcurrent.get(); + ArrayList arrayList = local.get(superType); + if (arrayList != null) { + subscriptions = arrayList.toArray(EMPTY); - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(superType, subsPerType); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderConcurrent.set(subHolderConcurrent.initialValue()); +// lock.unlockWrite(stamp); + return subscriptions; + } - Collection> types = getSuperClasses(superType); - if (types.isEmpty()) { - return subsPerType; - } + // array was null, return EMPTY collection + Class[] types = getSuperClasses_NL(superType, isArray(superType)); + int length = types.length; + if (length == 0) { + local.put(superType, new ArrayList(0)); +// lock.unlockWrite(stamp); + return EMPTY; + } - Map, Collection> local2 = this.subscriptionsPerMessageSingle; + // types was not empty, so get subscriptions for each type and collate them + Map, ArrayList> local2 = this.subscriptionsPerMessageSingle; + Class superClass; - Class superClass; - Iterator> iterator; + ArrayList subs; + Subscription sub; + arrayList = new ArrayList(16); - Iterator subIterator; - Subscription sub; - for (iterator = types.iterator(); iterator.hasNext();) { - superClass = iterator.next(); + for (int i=0;i subs = local2.get(superClass); - if (subs != null) { - for (subIterator = subs.iterator(); subIterator.hasNext();) { - sub = subIterator.next(); - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } + if (subs != null) { + for (int j=0;j getSuperSubscriptions(Class superType1, Class superType2) { - HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; - - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2); +// HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; +// +// // whenever our subscriptions change, this map is cleared. +// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2); ConcurrentSet subsPerType = null; - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderSingle.set(subHolderSingle.initialValue()); - - // whenever our subscriptions change, this map is cleared. - Collection> types1 = getSuperClasses(superType1); - - if (types1 != null) { - Collection> types2 = getSuperClasses(superType2); - - Collection subs; - HashMapTree, Collection> leaf1; - HashMapTree, Collection> leaf2; - - Class eventSuperType1; - Class eventSuperType2; - - Iterator> iterator1; - Iterator> iterator2; - - Iterator subIterator; - Subscription sub; - - for (iterator1 = types1.iterator(); iterator1.hasNext();) { - eventSuperType1 = iterator1.next(); - - boolean type1Matches = eventSuperType1 == superType1; - if (type1Matches) { - continue; - } - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null && types2 != null) { - for (iterator2 = types2.iterator(); iterator2.hasNext();) { - eventSuperType2 = iterator2.next(); - - if (type1Matches && eventSuperType2 == superType2) { - continue; - } - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - subs = leaf2.getValue(); - if (subs != null) { - for (subIterator = subs.iterator(); subIterator.hasNext();) { - sub = subIterator.next(); - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - } - } else { - // someone beat us - subsPerType = putIfAbsent; - } - } +// +// // we DO NOT care about duplicate, because the answers will be the same +// if (subsPerTypeLeaf != null) { +// // if the leaf exists, then the value exists. +// subsPerType = subsPerTypeLeaf.getValue(); +// } else { +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.get(); +// +// // cache our subscriptions for super classes, so that their access can be fast! +// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); +// if (putIfAbsent == null) { +// // we are the first one in the map +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// // whenever our subscriptions change, this map is cleared. +// Collection> types1 = getSuperClasses(superType1); +// +// if (types1 != null) { +// Collection> types2 = getSuperClasses(superType2); +// +// Collection subs; +// HashMapTree, Collection> leaf1; +// HashMapTree, Collection> leaf2; +// +// Class eventSuperType1; +// Class eventSuperType2; +// +// Iterator> iterator1; +// Iterator> iterator2; +// +// Iterator subIterator; +// Subscription sub; +// +// for (iterator1 = types1.iterator(); iterator1.hasNext();) { +// eventSuperType1 = iterator1.next(); +// +// boolean type1Matches = eventSuperType1 == superType1; +// if (type1Matches) { +// continue; +// } +// +// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); +// if (leaf1 != null && types2 != null) { +// for (iterator2 = types2.iterator(); iterator2.hasNext();) { +// eventSuperType2 = iterator2.next(); +// +// if (type1Matches && eventSuperType2 == superType2) { +// continue; +// } +// +// leaf2 = leaf1.getLeaf(eventSuperType2); +// +// if (leaf2 != null) { +// subs = leaf2.getValue(); +// if (subs != null) { +// for (subIterator = subs.iterator(); subIterator.hasNext();) { +// sub = subIterator.next(); +// if (sub.acceptsSubtypes()) { +// subsPerType.add(sub); +// } +// } +// } +// } +// } +// } +// } +// } +// } else { +// // someone beat us +// subsPerType = putIfAbsent; +// } +// } return subsPerType; } @@ -281,101 +415,101 @@ public class SubscriptionUtils { // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { - HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; - - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); - ConcurrentSet subsPerType; - - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderSingle.set(subHolderSingle.initialValue()); - - Collection> types1 = getSuperClasses(superType1); - - if (types1 != null) { - Collection> types2 = getSuperClasses(superType2); - Collection> types3 = getSuperClasses(superType3); - - Collection subs; - HashMapTree, Collection> leaf1; - HashMapTree, Collection> leaf2; - HashMapTree, Collection> leaf3; - - Class eventSuperType1; - Class eventSuperType2; - Class eventSuperType3; - - Iterator> iterator1; - Iterator> iterator2; - Iterator> iterator3; - - Iterator subIterator; - Subscription sub; - - for (iterator1 = types1.iterator(); iterator1.hasNext();) { - eventSuperType1 = iterator1.next(); - - boolean type1Matches = eventSuperType1 == superType1; - if (type1Matches) { - continue; - } - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null && types2 != null) { - for (iterator2 = types2.iterator(); iterator2.hasNext();) { - eventSuperType2 = iterator2.next(); - - boolean type12Matches = type1Matches && eventSuperType2 == superType2; - if (type12Matches) { - continue; - } - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null && types3 != null) { - for (iterator3 = types3.iterator(); iterator3.hasNext();) { - eventSuperType3 = iterator3.next(); - - if (type12Matches && eventSuperType3 == superType3) { - continue; - } - - leaf3 = leaf2.getLeaf(eventSuperType3); - - if (leaf3 != null) { - subs = leaf3.getValue(); - if (subs != null) { - for (subIterator = subs.iterator(); subIterator.hasNext();) { - sub = subIterator.next(); - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - } - } - } - } else { - // someone beat us - subsPerType = putIfAbsent; - } - } +// HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; +// +// // whenever our subscriptions change, this map is cleared. +// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); + ConcurrentSet subsPerType = null; +// +// +// // we DO NOT care about duplicate, because the answers will be the same +// if (subsPerTypeLeaf != null) { +// // if the leaf exists, then the value exists. +// subsPerType = subsPerTypeLeaf.getValue(); +// } else { +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.get(); +// +// // cache our subscriptions for super classes, so that their access can be fast! +// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); +// if (putIfAbsent == null) { +// // we are the first one in the map +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// Collection> types1 = getSuperClasses(superType1); +// +// if (types1 != null) { +// Collection> types2 = getSuperClasses(superType2); +// Collection> types3 = getSuperClasses(superType3); +// +// Collection subs; +// HashMapTree, Collection> leaf1; +// HashMapTree, Collection> leaf2; +// HashMapTree, Collection> leaf3; +// +// Class eventSuperType1; +// Class eventSuperType2; +// Class eventSuperType3; +// +// Iterator> iterator1; +// Iterator> iterator2; +// Iterator> iterator3; +// +// Iterator subIterator; +// Subscription sub; +// +// for (iterator1 = types1.iterator(); iterator1.hasNext();) { +// eventSuperType1 = iterator1.next(); +// +// boolean type1Matches = eventSuperType1 == superType1; +// if (type1Matches) { +// continue; +// } +// +// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); +// if (leaf1 != null && types2 != null) { +// for (iterator2 = types2.iterator(); iterator2.hasNext();) { +// eventSuperType2 = iterator2.next(); +// +// boolean type12Matches = type1Matches && eventSuperType2 == superType2; +// if (type12Matches) { +// continue; +// } +// +// leaf2 = leaf1.getLeaf(eventSuperType2); +// +// if (leaf2 != null && types3 != null) { +// for (iterator3 = types3.iterator(); iterator3.hasNext();) { +// eventSuperType3 = iterator3.next(); +// +// if (type12Matches && eventSuperType3 == superType3) { +// continue; +// } +// +// leaf3 = leaf2.getLeaf(eventSuperType3); +// +// if (leaf3 != null) { +// subs = leaf3.getValue(); +// if (subs != null) { +// for (subIterator = subs.iterator(); subIterator.hasNext();) { +// sub = subIterator.next(); +// if (sub.acceptsSubtypes()) { +// subsPerType.add(sub); +// } +// } +// } +// } +// } +// } +// } +// } +// } +// } +// } else { +// // someone beat us +// subsPerType = putIfAbsent; +// } +// } return subsPerType; } diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index b1d9da3..a020328 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -1,7 +1,6 @@ package dorkbox.util.messagebus.common; -import java.util.Collection; -import java.util.Iterator; +import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -20,10 +19,10 @@ public class VarArgUtils { private final int stripeSize; private final SubscriptionUtils utils; - private final Map, Collection> subscriptionsPerMessageSingle; + private final Map, ArrayList> subscriptionsPerMessageSingle; - public VarArgUtils(SubscriptionUtils utils, Map, Collection> subscriptionsPerMessageSingle, + public VarArgUtils(SubscriptionUtils utils, Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) { this.utils = utils; @@ -35,7 +34,7 @@ public class VarArgUtils { this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(16, loadFactor, stripeSize); this.varArgSuperClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, loadFactor); - this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize); + this.subHolderConcurrent = new SubscriptionHolder(); } @@ -50,38 +49,40 @@ public class VarArgUtils { // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions public ConcurrentSet getVarArgSubscriptions(Class messageClass) { - ConcurrentMap, ConcurrentSet> local = this.varArgSubscriptions; +// ConcurrentMap, ConcurrentSet> local = this.varArgSubscriptions; +// +// // whenever our subscriptions change, this map is cleared. +// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; +// ConcurrentSet subsPerType = subHolderConcurrent.get(); +// +// // cache our subscriptions for super classes, so that their access can be fast! +// ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); +// if (putIfAbsent == null) { +// // we are the first one in the map +// subHolderConcurrent.set(subHolderConcurrent.initialValue()); +// +// // this caches our array type. This is never cleared. +// Class arrayVersion = this.utils.getArrayClass(messageClass); +// +// Iterator iterator; +// Subscription sub; +// +// Collection subs = this.subscriptionsPerMessageSingle.get(arrayVersion); +// if (subs != null) { +// for (iterator = subs.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// if (sub.acceptsVarArgs()) { +// subsPerType.add(sub); +// } +// } +// } +// return subsPerType; +// } else { +// // someone beat us +// return putIfAbsent; +// } - // whenever our subscriptions change, this map is cleared. - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - ConcurrentSet subsPerType = subHolderConcurrent.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - - // this caches our array type. This is never cleared. - Class arrayVersion = this.utils.getArrayClass(messageClass); - - Iterator iterator; - Subscription sub; - - Collection subs = this.subscriptionsPerMessageSingle.get(arrayVersion); - if (subs != null) { - for (iterator = subs.iterator(); iterator.hasNext();) { - sub = iterator.next(); - if (sub.acceptsVarArgs()) { - subsPerType.add(sub); - } - } - } - return subsPerType; - } else { - // someone beat us - return putIfAbsent; - } + return null; } @@ -90,53 +91,54 @@ public class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass) { - // whenever our subscriptions change, this map is cleared. - ConcurrentMap, ConcurrentSet> local = this.varArgSuperClassSubscriptions; - - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - ConcurrentSet subsPerType = subHolderConcurrent.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); - - if (putIfAbsent == null) { - // we are the first one in the map - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - - Class arrayVersion = this.utils.getArrayClass(messageClass); - Collection> types = this.utils.getSuperClasses(arrayVersion, true); - if (types.isEmpty()) { - return subsPerType; - } - - Map, Collection> local2 = this.subscriptionsPerMessageSingle; - - Iterator> iterator; - Class superClass; - - Iterator subIterator; - Subscription sub; - - - for (iterator = types.iterator(); iterator.hasNext();) { - superClass = iterator.next(); - - Collection subs = local2.get(superClass); - if (subs != null) { - for (subIterator = subs.iterator(); subIterator.hasNext();) { - sub = subIterator.next(); - if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { - subsPerType.add(sub); - } - } - } - } - return subsPerType; - } else { - // someone beat us - return putIfAbsent; - } +// // whenever our subscriptions change, this map is cleared. +// ConcurrentMap, ConcurrentSet> local = this.varArgSuperClassSubscriptions; +// +// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; +// ConcurrentSet subsPerType = subHolderConcurrent.get(); +// +// // cache our subscriptions for super classes, so that their access can be fast! +// ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); +// +// if (putIfAbsent == null) { +// // we are the first one in the map +// subHolderConcurrent.set(subHolderConcurrent.initialValue()); +// +// Class arrayVersion = this.utils.getArrayClass(messageClass); +// Collection> types = this.utils.getSuperClasses(arrayVersion, true); +// if (types.isEmpty()) { +// return subsPerType; +// } +// +// Map, Collection> local2 = this.subscriptionsPerMessageSingle; +// +// Iterator> iterator; +// Class superClass; +// +// Iterator subIterator; +// Subscription sub; +// +// +// for (iterator = types.iterator(); iterator.hasNext();) { +// superClass = iterator.next(); +// +// Collection subs = local2.get(superClass); +// if (subs != null) { +// for (subIterator = subs.iterator(); subIterator.hasNext();) { +// sub = subIterator.next(); +// if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { +// subsPerType.add(sub); +// } +// } +// } +// } +// return subsPerType; +// } else { +// // someone beat us +// return putIfAbsent; +// } + return null; } @@ -144,45 +146,46 @@ public class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { - HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; - - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); - ConcurrentSet subsPerType = null; - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - subsPerType = subHolderConcurrent.get(); - - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); - if (putIfAbsent != null) { - // someone beat us - subsPerType = putIfAbsent; - } else { - // the message class types are not the same, so look for a common superClass varArg subscription. - // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); - ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); - - Iterator iterator; - Subscription sub; - - for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { - sub = iterator.next(); - if (varargSuperSubscriptions2.contains(sub)) { - subsPerType.add(sub); - } - } - - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - } - } - - return subsPerType; +// HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; +// +// // whenever our subscriptions change, this map is cleared. +// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); +// ConcurrentSet subsPerType = null; +// +// // we DO NOT care about duplicate, because the answers will be the same +// if (subsPerTypeLeaf != null) { +// // if the leaf exists, then the value exists. +// subsPerType = subsPerTypeLeaf.getValue(); +// } else { +// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; +// subsPerType = subHolderConcurrent.get(); +// +// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); +// if (putIfAbsent != null) { +// // someone beat us +// subsPerType = putIfAbsent; +// } else { +// // the message class types are not the same, so look for a common superClass varArg subscription. +// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages +// ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); +// ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); +// +// Iterator iterator; +// Subscription sub; +// +// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// if (varargSuperSubscriptions2.contains(sub)) { +// subsPerType.add(sub); +// } +// } +// +// subHolderConcurrent.set(subHolderConcurrent.initialValue()); +// } +// } +// +// return subsPerType; + return null; } @@ -190,46 +193,48 @@ public class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions public ConcurrentSet getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; +// HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; +// +// // whenever our subscriptions change, this map is cleared. +// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); +// ConcurrentSet subsPerType = null; +// +// // we DO NOT care about duplicate, because the answers will be the same +// if (subsPerTypeLeaf != null) { +// // if the leaf exists, then the value exists. +// subsPerType = subsPerTypeLeaf.getValue(); +// } else { +// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; +// subsPerType = subHolderConcurrent.get(); +// +// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); +// if (putIfAbsent != null) { +// // someone beat us +// subsPerType = putIfAbsent; +// } else { +// // the message class types are not the same, so look for a common superClass varArg subscription. +// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages +// ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); +// ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); +// ConcurrentSet varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); +// +// Iterator iterator; +// Subscription sub; +// +// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { +// subsPerType.add(sub); +// } +// } +// +// subHolderConcurrent.set(subHolderConcurrent.initialValue()); +// } +// } +// +// return subsPerType; - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); - ConcurrentSet subsPerType = null; - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - subsPerType = subHolderConcurrent.get(); - - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); - if (putIfAbsent != null) { - // someone beat us - subsPerType = putIfAbsent; - } else { - // the message class types are not the same, so look for a common superClass varArg subscription. - // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); - ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); - ConcurrentSet varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); - - Iterator iterator; - Subscription sub; - - for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { - sub = iterator.next(); - if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { - subsPerType.add(sub); - } - } - - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - } - } - - return subsPerType; + return null; } } diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java index 4aa581b..fbaba63 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java @@ -1,22 +1,18 @@ package dorkbox.util.messagebus.common.thread; +import java.util.ArrayList; + import dorkbox.util.messagebus.subscription.Subscription; -public class SubscriptionHolder extends ThreadLocal> { +public class SubscriptionHolder extends ThreadLocal> { - private final float loadFactor; - private final int stripeSize; - - public SubscriptionHolder(float loadFactor, int stripeSize) { + public SubscriptionHolder() { super(); - - this.loadFactor = loadFactor; - this.stripeSize = stripeSize; } @Override - public ConcurrentSet initialValue() { - return new ConcurrentSet(16, this.loadFactor, this.stripeSize); + public ArrayList initialValue() { + return new ArrayList(); } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 8beec9e..4913435 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -6,7 +6,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.omg.CORBA.BooleanHolder; -import dorkbox.util.messagebus.common.StrongConcurrentSet; +import com.esotericsoftware.reflectasm.MethodAccess; + +import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; @@ -41,9 +43,9 @@ public class Subscription { public Subscription(MessageHandler handler) { this.handlerMetadata = handler; -// this.listeners = new StrongConcurrentSetV8(16, 0.85F); - this.listeners = new StrongConcurrentSet(16, 0.85F); -// this.listeners = new ConcurrentLinkedQueue2(); + this.listeners = new StrongConcurrentSetV8(16, 0.85F, 16); +// this.listeners = new StrongConcurrentSet(16, 0.85F); +// this.listeners = new ConcurrentLinkedQueue2(); IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); if (handler.isSynchronized()) { @@ -101,10 +103,10 @@ public class Subscription { /** * @return true if there were listeners for this publication, false if there was nothing */ - public void publish(Object message) throws Throwable { -// MethodAccess handler = this.handlerMetadata.getHandler(); -// int handleIndex = this.handlerMetadata.getMethodIndex(); -// IHandlerInvocation invocation = this.invocation; + public final void publish(final Object message) throws Throwable { + MethodAccess handler = this.handlerMetadata.getHandler(); + int handleIndex = this.handlerMetadata.getMethodIndex(); + IHandlerInvocation invocation = this.invocation; Iterator iterator; Object listener; @@ -112,9 +114,9 @@ public class Subscription { for (iterator = this.listeners.iterator(); iterator.hasNext();) { listener = iterator.next(); - this.c++; +// this.c++; -// invocation.invoke(listener, handler, handleIndex, message); + invocation.invoke(listener, handler, handleIndex, message); } } diff --git a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java index bac5d2e..c59b66f 100644 --- a/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/src/test/java/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -1,6 +1,7 @@ package dorkbox.util.messagebus.common; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -44,13 +45,13 @@ public class SubscriptionValidator extends AssertSupport{ // we split subs + superSubs into TWO calls. Collection collection = new ArrayDeque(8); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); + Subscription[] subscriptions = manager.getSubscriptionsByMessageType(messageType); if (subscriptions != null) { - collection.addAll(subscriptions); + collection.addAll(Arrays.asList(subscriptions)); } - Collection superSubs = manager.getSuperSubscriptions(messageType); + Subscription[] superSubs = manager.getSuperSubscriptions(messageType); if (superSubs != null) { - collection.addAll(superSubs); + collection.addAll(Arrays.asList(superSubs)); } assertEquals(validationEntries.size(), collection.size());