WIP, getting better results

This commit is contained in:
nathan 2015-05-28 03:36:26 +02:00
parent a5a4cc861c
commit df14de3252
10 changed files with 733 additions and 545 deletions

View File

@ -2,7 +2,6 @@ package dorkbox.util.messagebus;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import org.jctools.util.Pow2;
@ -193,29 +192,46 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object message) {
try {
boolean subsPublished = false;
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null
boolean subsPublished = false;
Iterator<Subscription> iterator;
Subscription[] subscriptions;
Subscription sub;
subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (subscriptions != null) {
int length = subscriptions.length;
if (length > 0) {
for (int i=0;i<length;i++) {
sub = subscriptions[i];
sub.publish(message);
sub.publish(message);
}
subsPublished = true;
}
subsPublished = true;
}
// if (!this.forceExactMatches) {
// Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
if (!this.forceExactMatches) {
// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null
// // now get superClasses
// int length = superSubscriptions.length;
//
// if (length > 0) {
// for (int i=0;i<length;i++) {
// sub = superSubscriptions[i];
//
// sub.publish(message);
// }
//
// subsPublished = true;
// }
// if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
// for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
@ -257,19 +273,21 @@ public class MultiMBassador implements IMessageBus {
// }
// }
// }
// }
}
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
Subscription[] deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null) {
int length = deadSubscriptions.length;
if (length > 0) {
DeadMessage deadMessage = new DeadMessage(message);
for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
for (int i=0;i<length;i++) {
sub = deadSubscriptions[i];
// this catches all exception types
sub.publish(deadMessage);
sub.publish(deadMessage);
}
}
}
}

View File

@ -2,7 +2,6 @@ package dorkbox.util.messagebus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -37,6 +36,8 @@ import dorkbox.util.messagebus.subscription.Subscription;
* Date: 2/2/15
*/
public class SubscriptionManager {
private static final Subscription[] EMPTY = new Subscription[0];
private static final float LOAD_FACTOR = 0.8F;
// the metadata reader that is used to inspect objects passed to the subscribe method
@ -53,8 +54,8 @@ public class SubscriptionManager {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
@ -70,6 +71,7 @@ public class SubscriptionManager {
private final StampedLock lock = new StampedLock();
// private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
@ -84,8 +86,8 @@ public class SubscriptionManager {
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, this.STRIPE_SIZE);
this.subscriptionsPerMessageSingle = new HashMap<Class<?>, Collection<Subscription>>(64);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, loadFactor);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(64, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor, this.STRIPE_SIZE);
@ -97,8 +99,8 @@ public class SubscriptionManager {
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE);
this.subHolderSingle = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE);
this.subHolderConcurrent = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE);
this.subHolderSingle = new SubscriptionHolder();
this.subHolderConcurrent = new SubscriptionHolder();
}
public void shutdown() {
@ -138,7 +140,10 @@ public class SubscriptionManager {
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility,
// but for concurrency because there are race conditions here if we don't.
long stamp = this.lock.writeLock();
StampedLock lock = this.lock;
long stamp = lock.writeLock();
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
subsPerListener = subsPerListener2.get(listenerClass);
@ -153,15 +158,16 @@ public class SubscriptionManager {
}
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
}
// subsPerListener == null, so we now enter an exclusive write lock and double check.
// long origStamp = stamp;
// if ((stamp = this.lock.tryConvertToWriteLock(stamp)) == 0) {
// this.lock.unlockRead(origStamp);
// stamp = this.lock.writeLock();
// }
long origStamp = stamp;
if ((stamp = this.lock.tryConvertToWriteLock(stamp)) == 0) {
this.lock.unlockRead(origStamp);
stamp = this.lock.writeLock();
}
subsPerListener = subsPerListener2.get(listenerClass);
@ -176,6 +182,7 @@ public class SubscriptionManager {
}
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
}
@ -188,13 +195,14 @@ public class SubscriptionManager {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, Boolean.TRUE);
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
} else {
subsPerListener = new ConcurrentSet<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
VarArgPossibility varArgPossibility = this.varArgPossibility;
Map<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, Collection<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
Iterator<MessageHandler> iterator;
@ -218,33 +226,31 @@ public class SubscriptionManager {
subsPerListener2.put(listenerClass, subsPerListener);
this.lock.unlockWrite(stamp);
// writeLock.unlock();
}
}
// inside a write lock
private final Collection<Subscription> getSubsForPublication(MessageHandler messageHandler,
Map<Class<?>, Collection<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, Collection<Subscription>> subsPerMessageMulti,
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
VarArgPossibility varArgPossibility) {
Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length;
ConcurrentSet<Subscription> subsPerType;
// ConcurrentSet<Subscription> subsPerType;
SubscriptionUtils utils = this.utils;
Class<?> type0 = types[0];
switch (size) {
case 1: {
Collection<Subscription> subs = subsPerMessageSingle.get(type0);
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
// subs = new ConcurrentSet<>(16, LOAD_FACTOR, this.STRIPE_SIZE);
subs = new ArrayList<>(16);
// subs = new ConcurrentLinkedQueue2<Subscription>();
// subs = new LinkedList<Subscription>();
subs = new ArrayList<>(8);
boolean isArray = utils.isArray(type0);
if (isArray) {
@ -252,7 +258,7 @@ public class SubscriptionManager {
}
// cache the super classes
utils.getSuperClasses(type0, isArray);
utils.cacheSuperClasses(type0, isArray);
subsPerMessageSingle.put(type0, subs);
}
@ -261,63 +267,64 @@ public class SubscriptionManager {
}
case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
utils.getSuperClasses(type0);
utils.getSuperClasses(types[1]);
return subsPerType;
}
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// // cache the super classes
// utils.cacheSuperClasses(type0);
// utils.cacheSuperClasses(types[1]);
//
// return subsPerType;
// }
}
case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
utils.getSuperClasses(type0);
utils.getSuperClasses(types[1]);
utils.getSuperClasses(types[2]);
return subsPerType;
}
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// // cache the super classes
// utils.cacheSuperClasses(type0);
// utils.cacheSuperClasses(types[1]);
// utils.cacheSuperClasses(types[2]);
//
// return subsPerType;
// }
}
default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
Class<?> c;
int length = types.length;
for (int i = 0; i < length; i++) {
c = types[i];
// cache the super classes
utils.getSuperClasses(c);
}
return subsPerType;
}
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// Class<?> c;
// int length = types.length;
// for (int i = 0; i < length; i++) {
// c = types[i];
//
// // cache the super classes
// utils.cacheSuperClasses(c);
// }
//
// return subsPerType;
// }
return null;
}
}
}
@ -334,9 +341,11 @@ public class SubscriptionManager {
}
// these are concurrent collections
clearConcurrentCollections();
// clearConcurrentCollections();
long stamp = this.lock.writeLock();
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
@ -351,6 +360,7 @@ public class SubscriptionManager {
}
this.lock.unlockWrite(stamp);
// writeLock.unlock();
}
private void clearConcurrentCollections() {
@ -359,39 +369,55 @@ public class SubscriptionManager {
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
public final Subscription[] getSubscriptionsByMessageType(Class<?> messageType) {
Collection<Subscription> collection;
Collection<Subscription> subscriptions = null;
Subscription[] subscriptions = null;
long stamp = this.lock.tryOptimisticRead(); // non blocking
long stamp = this.lock.readLock();
// Lock writeLock = this.lock.readLock();
// writeLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageType);
if (collection != null) {
// subscriptions = new ArrayDeque<>(collection);
subscriptions = new ArrayList<>(collection);
// subscriptions = new LinkedList<>();
// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
// subscriptions.addAll(collection);
}
if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
stamp = this.lock.readLock();
try {
collection = this.subscriptionsPerMessageSingle.get(messageType);
if (collection != null) {
// subscriptions = new ArrayDeque<>(collection);
subscriptions = new ArrayList<>(collection);
// subscriptions = new LinkedList<>();
// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
// subscriptions.addAll(collection);
}
}
finally {
this.lock.unlockRead(stamp);
try {
collection = this.subscriptionsPerMessageSingle.get(messageType);
if (collection != null) {
subscriptions = collection.toArray(EMPTY);
}
}
finally {
this.lock.unlockRead(stamp);
// writeLock.unlock();
}
// long stamp = this.lock.tryOptimisticRead(); // non blocking
//
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// if (collection != null) {
//// subscriptions = new ArrayDeque<>(collection);
// subscriptions = new ArrayList<>(collection);
//// subscriptions = new LinkedList<>();
//// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
//
//// subscriptions.addAll(collection);
// }
//
// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
// stamp = this.lock.readLock();
// try {
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// if (collection != null) {
//// subscriptions = new ArrayDeque<>(collection);
// subscriptions = new ArrayList<>(collection);
//// subscriptions = new LinkedList<>();
//// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
//
//// subscriptions.addAll(collection);
// }
// }
// finally {
// this.lock.unlockRead(stamp);
// }
// }
return subscriptions;
}
@ -459,7 +485,7 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
public final Subscription[] getSuperSubscriptions(Class<?> superType) {
return this.utils.getSuperSubscriptions(superType);
}

View File

@ -58,6 +58,7 @@ public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T>
stamp = this.lock.writeLock();
}
changed = insert(element);
this.lock.unlock(stamp);

View File

@ -4346,4 +4346,4 @@ final Node<K,V> find(int h, Object k) {
e.getCause());
}
}
}
}

View File

@ -13,4 +13,9 @@ public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, 1));
}
public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, stripeSize));
}
}

View File

@ -1,6 +1,7 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@ -8,50 +9,56 @@ import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.StampedLock;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionUtils {
private static final Class<?>[] SUPER_CLASS_EMPTY = new Class<?>[0];
private StampedLock superClassLock = new StampedLock();
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, ConcurrentSet<Class<?>>> superClassesCache;
private final ConcurrentMap<Class<?>, ArrayList<Class<?>>> superClassesCache;
private final ClassHolder classHolderSingle;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptions;
private final ConcurrentMap<Class<?>, ArrayList<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
public SubscriptionUtils(Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti,
public SubscriptionUtils(Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti,
float loadFactor, int stripeSize) {
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Class<?>>>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, ArrayList<Class<?>>>(32, loadFactor, 1);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(32, loadFactor, stripeSize);
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>();
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor);
this.subHolderSingle = new SubscriptionHolder(loadFactor, stripeSize);
this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize);
this.subHolderSingle = new SubscriptionHolder();
this.subHolderConcurrent = new SubscriptionHolder();
}
public void clear() {
@ -63,43 +70,113 @@ public class SubscriptionUtils {
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/
public Collection<Class<?>> getSuperClasses(Class<?> clazz) {
return getSuperClasses(clazz, isArray(clazz));
}
public final Collection<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
public final Class<?>[] getSuperClasses_NL(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, ConcurrentSet<Class<?>>> local = this.superClassesCache;
ConcurrentMap<Class<?>, ArrayList<Class<?>>> local = this.superClassesCache;
Class<?>[] classes;
ClassHolder classHolderSingle = this.classHolderSingle;
ConcurrentSet<Class<?>> classes = classHolderSingle.get();
ArrayList<Class<?>> arrayList = local.get(clazz);
ConcurrentSet<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
if (putIfAbsent == null) {
// we are the first one in the map
classHolderSingle.set(classHolderSingle.initialValue());
// it doesn't matter if concurrent access stomps on values, since they are always the same.
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
} else {
// get all super types of class
Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
arrayList = new ArrayList<Class<?>>(superTypes.size());
Iterator<Class<?>> iterator;
Class<?> c;
for (iterator = superTypes.iterator(); iterator.hasNext();) {
c = iterator.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
classes.add(c);
arrayList.add(c);
}
}
return classes;
} else {
// someone beat us
return putIfAbsent;
local.put(clazz, arrayList);
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
}
return classes;
}
// called inside sub/unsub write lock
public void cacheSuperClasses(Class<?> clazz) {
// TODO Auto-generated method stub
}
// called inside sub/unsub write lock
public void cacheSuperClasses(Class<?> clazz, boolean isArray) {
// TODO Auto-generated method stub
}
public final Class<?>[] getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, ArrayList<Class<?>>> local = this.superClassesCache;
Class<?>[] classes;
StampedLock lock = this.superClassLock;
long stamp = lock.tryOptimisticRead();
if (stamp > 0) {
ArrayList<Class<?>> arrayList = local.get(clazz);
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
if (lock.validate(stamp)) {
return classes;
} else {
stamp = lock.readLock();
arrayList = local.get(clazz);
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
lock.unlockRead(stamp);
return classes;
}
}
}
}
// unable to get a valid subscription. Have to acquire a write lock
long origStamp = stamp;
if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
lock.unlockRead(origStamp);
stamp = lock.writeLock();
}
// get all super types of class
Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size());
Iterator<Class<?>> iterator;
Class<?> c;
for (iterator = superTypes.iterator(); iterator.hasNext();) {
c = iterator.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
arrayList.add(c);
}
}
local.put(clazz, arrayList);
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
lock.unlockWrite(stamp);
return classes;
}
/**
@ -145,135 +222,192 @@ public class SubscriptionUtils {
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
private static Subscription[] EMPTY = new Subscription[0];
private StampedLock superSubLock = new StampedLock();
/**
* Returns an array copy of the super subscriptions for the specified type.
*
* This ALSO checks to see if the superClass accepts subtypes.
*
* @return CAN NOT RETURN NULL
*/
public final Subscription[] getSuperSubscriptions(Class<?> superType) {
// whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptions;
ConcurrentMap<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
Subscription[] subscriptions;
//
// StampedLock lock = this.superSubLock;
// long stamp = lock.tryOptimisticRead();
//
// if (stamp > 0) {
// ArrayList<Subscription> arrayList = local.get(superType);
// if (arrayList != null) {
// subscriptions = arrayList.toArray(EMPTY);
//
// if (lock.validate(stamp)) {
// return subscriptions;
// } else {
// stamp = lock.readLock();
//
// arrayList = local.get(superType);
// if (arrayList != null) {
// subscriptions = arrayList.toArray(EMPTY);
// lock.unlockRead(stamp);
// return subscriptions;
// }
//
// // unable to get a valid subscription. Have to acquire a write lock
// long origStamp = stamp;
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// lock.unlock(origStamp);
// stamp = lock.writeLock();
// }
// }
// } else {
// // unable to get a valid subscription. Have to acquire a write lock
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// stamp = lock.writeLock();
// }
// }
// } else {
// // unable to get a valid subscription. Have to acquire a write lock
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// stamp = lock.writeLock();
// }
// }
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
ArrayList<Subscription> arrayList = local.get(superType);
if (arrayList != null) {
subscriptions = arrayList.toArray(EMPTY);
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
// lock.unlockWrite(stamp);
return subscriptions;
}
Collection<Class<?>> types = getSuperClasses(superType);
if (types.isEmpty()) {
return subsPerType;
}
// array was null, return EMPTY collection
Class<?>[] types = getSuperClasses_NL(superType, isArray(superType));
int length = types.length;
if (length == 0) {
local.put(superType, new ArrayList<Subscription>(0));
// lock.unlockWrite(stamp);
return EMPTY;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
// types was not empty, so get subscriptions for each type and collate them
Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Class<?> superClass;
Class<?> superClass;
Iterator<Class<?>> iterator;
ArrayList<Subscription> subs;
Subscription sub;
arrayList = new ArrayList<Subscription>(16);
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator = types.iterator(); iterator.hasNext();) {
superClass = iterator.next();
for (int i=0;i<length;i++) {
superClass = types[i];
subs = local2.get(superClass);
Collection<Subscription> subs = local2.get(superClass);
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
if (subs != null) {
for (int j=0;j<subs.size();j++) {
sub = subs.get(j);
if (sub.acceptsSubtypes()) {
arrayList.add(sub);
}
}
}
return subsPerType;
} else {
// someone beat us
return putIfAbsent;
}
local.put(superType, arrayList);
subscriptions = arrayList.toArray(EMPTY);
// lock.unlockWrite(stamp);
return subscriptions;
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
ConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
// whenever our subscriptions change, this map is cleared.
Collection<Class<?>> types1 = getSuperClasses(superType1);
if (types1 != null) {
Collection<Class<?>> types2 = getSuperClasses(superType2);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Iterator<Class<?>> iterator1;
Iterator<Class<?>> iterator2;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator1 = types1.iterator(); iterator1.hasNext();) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null && types2 != null) {
for (iterator2 = types2.iterator(); iterator2.hasNext();) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderSingle.set(subHolderSingle.initialValue());
//
// // whenever our subscriptions change, this map is cleared.
// Collection<Class<?>> types1 = getSuperClasses(superType1);
//
// if (types1 != null) {
// Collection<Class<?>> types2 = getSuperClasses(superType2);
//
// Collection<Subscription> subs;
// HashMapTree<Class<?>, Collection<Subscription>> leaf1;
// HashMapTree<Class<?>, Collection<Subscription>> leaf2;
//
// Class<?> eventSuperType1;
// Class<?> eventSuperType2;
//
// Iterator<Class<?>> iterator1;
// Iterator<Class<?>> iterator2;
//
// Iterator<Subscription> subIterator;
// Subscription sub;
//
// for (iterator1 = types1.iterator(); iterator1.hasNext();) {
// eventSuperType1 = iterator1.next();
//
// boolean type1Matches = eventSuperType1 == superType1;
// if (type1Matches) {
// continue;
// }
//
// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
// if (leaf1 != null && types2 != null) {
// for (iterator2 = types2.iterator(); iterator2.hasNext();) {
// eventSuperType2 = iterator2.next();
//
// if (type1Matches && eventSuperType2 == superType2) {
// continue;
// }
//
// leaf2 = leaf1.getLeaf(eventSuperType2);
//
// if (leaf2 != null) {
// subs = leaf2.getValue();
// if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next();
// if (sub.acceptsSubtypes()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// }
// }
// }
// }
// } else {
// // someone beat us
// subsPerType = putIfAbsent;
// }
// }
return subsPerType;
}
@ -281,101 +415,101 @@ public class SubscriptionUtils {
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
ConcurrentSet<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
Collection<Class<?>> types1 = getSuperClasses(superType1);
if (types1 != null) {
Collection<Class<?>> types2 = getSuperClasses(superType2);
Collection<Class<?>> types3 = getSuperClasses(superType3);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
HashMapTree<Class<?>, Collection<Subscription>> leaf3;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Class<?> eventSuperType3;
Iterator<Class<?>> iterator1;
Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator1 = types1.iterator(); iterator1.hasNext();) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null && types2 != null) {
for (iterator2 = types2.iterator(); iterator2.hasNext();) {
eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2;
if (type12Matches) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null && types3 != null) {
for (iterator3 = types3.iterator(); iterator3.hasNext();) {
eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) {
continue;
}
leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) {
subs = leaf3.getValue();
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
}
}
} else {
// someone beat us
subsPerType = putIfAbsent;
}
}
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
ConcurrentSet<Subscription> subsPerType = null;
//
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.get();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderSingle.set(subHolderSingle.initialValue());
//
// Collection<Class<?>> types1 = getSuperClasses(superType1);
//
// if (types1 != null) {
// Collection<Class<?>> types2 = getSuperClasses(superType2);
// Collection<Class<?>> types3 = getSuperClasses(superType3);
//
// Collection<Subscription> subs;
// HashMapTree<Class<?>, Collection<Subscription>> leaf1;
// HashMapTree<Class<?>, Collection<Subscription>> leaf2;
// HashMapTree<Class<?>, Collection<Subscription>> leaf3;
//
// Class<?> eventSuperType1;
// Class<?> eventSuperType2;
// Class<?> eventSuperType3;
//
// Iterator<Class<?>> iterator1;
// Iterator<Class<?>> iterator2;
// Iterator<Class<?>> iterator3;
//
// Iterator<Subscription> subIterator;
// Subscription sub;
//
// for (iterator1 = types1.iterator(); iterator1.hasNext();) {
// eventSuperType1 = iterator1.next();
//
// boolean type1Matches = eventSuperType1 == superType1;
// if (type1Matches) {
// continue;
// }
//
// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
// if (leaf1 != null && types2 != null) {
// for (iterator2 = types2.iterator(); iterator2.hasNext();) {
// eventSuperType2 = iterator2.next();
//
// boolean type12Matches = type1Matches && eventSuperType2 == superType2;
// if (type12Matches) {
// continue;
// }
//
// leaf2 = leaf1.getLeaf(eventSuperType2);
//
// if (leaf2 != null && types3 != null) {
// for (iterator3 = types3.iterator(); iterator3.hasNext();) {
// eventSuperType3 = iterator3.next();
//
// if (type12Matches && eventSuperType3 == superType3) {
// continue;
// }
//
// leaf3 = leaf2.getLeaf(eventSuperType3);
//
// if (leaf3 != null) {
// subs = leaf3.getValue();
// if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next();
// if (sub.acceptsSubtypes()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// }
// }
// }
// }
// }
// }
// } else {
// // someone beat us
// subsPerType = putIfAbsent;
// }
// }
return subsPerType;
}

View File

@ -1,7 +1,6 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -20,10 +19,10 @@ public class VarArgUtils {
private final int stripeSize;
private final SubscriptionUtils utils;
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle,
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
float loadFactor, int stripeSize) {
this.utils = utils;
@ -35,7 +34,7 @@ public class VarArgUtils {
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor);
this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize);
this.subHolderConcurrent = new SubscriptionHolder();
}
@ -50,38 +49,40 @@ public class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSubscriptions(Class<?> messageClass) {
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSubscriptions;
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSubscriptions;
//
// // whenever our subscriptions change, this map is cleared.
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
//
// // this caches our array type. This is never cleared.
// Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
// if (subs != null) {
// for (iterator = subs.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (sub.acceptsVarArgs()) {
// subsPerType.add(sub);
// }
// }
// }
// return subsPerType;
// } else {
// // someone beat us
// return putIfAbsent;
// }
// whenever our subscriptions change, this map is cleared.
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
// this caches our array type. This is never cleared.
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
Iterator<Subscription> iterator;
Subscription sub;
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
if (subs != null) {
for (iterator = subs.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
}
}
return subsPerType;
} else {
// someone beat us
return putIfAbsent;
}
return null;
}
@ -90,53 +91,54 @@ public class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast!
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
Collection<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
if (types.isEmpty()) {
return subsPerType;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Iterator<Class<?>> iterator;
Class<?> superClass;
Iterator<Subscription> subIterator;
Subscription sub;
for (iterator = types.iterator(); iterator.hasNext();) {
superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null) {
for (subIterator = subs.iterator(); subIterator.hasNext();) {
sub = subIterator.next();
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
}
}
}
return subsPerType;
} else {
// someone beat us
return putIfAbsent;
}
// // whenever our subscriptions change, this map is cleared.
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
//
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
//
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
//
// Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
// Collection<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
// if (types.isEmpty()) {
// return subsPerType;
// }
//
// Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
//
// Iterator<Class<?>> iterator;
// Class<?> superClass;
//
// Iterator<Subscription> subIterator;
// Subscription sub;
//
//
// for (iterator = types.iterator(); iterator.hasNext();) {
// superClass = iterator.next();
//
// Collection<Subscription> subs = local2.get(superClass);
// if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next();
// if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// return subsPerType;
// } else {
// // someone beat us
// return putIfAbsent;
// }
return null;
}
@ -144,45 +146,46 @@ public class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
ConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get();
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
} else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
}
subHolderConcurrent.set(subHolderConcurrent.initialValue());
}
}
return subsPerType;
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
// ConcurrentSet<Subscription> subsPerType = null;
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.get();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
// if (putIfAbsent != null) {
// // someone beat us
// subsPerType = putIfAbsent;
// } else {
// // the message class types are not the same, so look for a common superClass varArg subscription.
// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
// ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
// ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (varargSuperSubscriptions2.contains(sub)) {
// subsPerType.add(sub);
// }
// }
//
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
// }
// }
//
// return subsPerType;
return null;
}
@ -190,46 +193,48 @@ public class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
// ConcurrentSet<Subscription> subsPerType = null;
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.get();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
// if (putIfAbsent != null) {
// // someone beat us
// subsPerType = putIfAbsent;
// } else {
// // the message class types are not the same, so look for a common superClass varArg subscription.
// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
// ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
// ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
// ConcurrentSet<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
// subsPerType.add(sub);
// }
// }
//
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
// }
// }
//
// return subsPerType;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
ConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get();
ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
} else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
ConcurrentSet<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
sub = iterator.next();
if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
subsPerType.add(sub);
}
}
subHolderConcurrent.set(subHolderConcurrent.initialValue());
}
}
return subsPerType;
return null;
}
}

View File

@ -1,22 +1,18 @@
package dorkbox.util.messagebus.common.thread;
import java.util.ArrayList;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionHolder extends ThreadLocal<ConcurrentSet<Subscription>> {
public class SubscriptionHolder extends ThreadLocal<ArrayList<Subscription>> {
private final float loadFactor;
private final int stripeSize;
public SubscriptionHolder(float loadFactor, int stripeSize) {
public SubscriptionHolder() {
super();
this.loadFactor = loadFactor;
this.stripeSize = stripeSize;
}
@Override
public ConcurrentSet<Subscription> initialValue() {
return new ConcurrentSet<Subscription>(16, this.loadFactor, this.stripeSize);
public ArrayList<Subscription> initialValue() {
return new ArrayList<Subscription>();
}
}

View File

@ -6,7 +6,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.omg.CORBA.BooleanHolder;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
@ -41,9 +43,9 @@ public class Subscription {
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
// this.listeners = new StrongConcurrentSetV8<Object>(16, 0.85F);
this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
// this.listeners = new ConcurrentLinkedQueue2();
this.listeners = new StrongConcurrentSetV8<Object>(16, 0.85F, 16);
// this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
// this.listeners = new ConcurrentLinkedQueue2<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
@ -101,10 +103,10 @@ public class Subscription {
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publish(Object message) throws Throwable {
// MethodAccess handler = this.handlerMetadata.getHandler();
// int handleIndex = this.handlerMetadata.getMethodIndex();
// IHandlerInvocation invocation = this.invocation;
public final void publish(final Object message) throws Throwable {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
Object listener;
@ -112,9 +114,9 @@ public class Subscription {
for (iterator = this.listeners.iterator(); iterator.hasNext();) {
listener = iterator.next();
this.c++;
// this.c++;
// invocation.invoke(listener, handler, handleIndex, message);
invocation.invoke(listener, handler, handleIndex, message);
}
}

View File

@ -1,6 +1,7 @@
package dorkbox.util.messagebus.common;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -44,13 +45,13 @@ public class SubscriptionValidator extends AssertSupport{
// we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
Subscription[] subscriptions = manager.getSubscriptionsByMessageType(messageType);
if (subscriptions != null) {
collection.addAll(subscriptions);
collection.addAll(Arrays.asList(subscriptions));
}
Collection<Subscription> superSubs = manager.getSuperSubscriptions(messageType);
Subscription[] superSubs = manager.getSuperSubscriptions(messageType);
if (superSubs != null) {
collection.addAll(superSubs);
collection.addAll(Arrays.asList(superSubs));
}
assertEquals(validationEntries.size(), collection.size());