WIP - added varArg Multi version for getting subscriptions, ie: int, int, string -> object[]

This commit is contained in:
nathan 2015-05-10 17:09:38 +02:00
parent dd35477346
commit 8d8bc1512f
6 changed files with 276 additions and 171 deletions

View File

@ -58,7 +58,7 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
this(false, numberOfThreads); this(true, numberOfThreads);
} }
/** /**
@ -216,7 +216,7 @@ public class MultiMBassador implements IMessageBus {
// publish to var arg, only if not already an array // publish to var arg, only if not already an array
if (!messageClass.isArray()) { if (!manager.isArray(messageClass)) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass); StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
@ -312,7 +312,7 @@ public class MultiMBassador implements IMessageBus {
} }
// publish to var arg, only if not already an array // publish to var arg, only if not already an array
if (messageClass1 == messageClass2 && !messageClass1.isArray()) { if (messageClass1 == messageClass2) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
@ -345,6 +345,27 @@ public class MultiMBassador implements IMessageBus {
sub = current.getValue(); sub = current.getValue();
current = current.next(); current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
}
} else {
StrongConcurrentSetV8<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
current = varargSuperMultiSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// since each sub will be for the "lowest common demoninator", we have to re-create
// this array from the componentType every time -- since it will be different
Class<?> componentType = sub.getHandledMessageTypes()[0].getComponentType();
Object[] asArray = (Object[]) Array.newInstance(componentType, 2);
asArray[0] = message1;
asArray[1] = message2;
// this catches all exception types // this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }
@ -412,8 +433,8 @@ public class MultiMBassador implements IMessageBus {
} }
} }
// publish to var arg, only if not already an array // publish to var arg, only if not already an array, and all of the same type
if (messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) { if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
@ -447,6 +468,28 @@ public class MultiMBassador implements IMessageBus {
sub = current.getValue(); sub = current.getValue();
current = current.next(); current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
}
} else {
StrongConcurrentSetV8<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
current = varargSuperMultiSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// since each sub will be for the "lowest common demoninator", we have to re-create
// this array from the componentType every time -- since it will be different
Class<?> componentType = sub.getHandledMessageTypes()[0].getComponentType();
Object[] asArray = (Object[]) Array.newInstance(componentType, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
// this catches all exception types // this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray); subsPublished |= sub.publishToSubscription(this, asArray);
} }

View File

@ -2,7 +2,6 @@ package dorkbox.util.messagebus;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -12,7 +11,6 @@ import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.ReflectionUtils; import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SubscriptionHolder; import dorkbox.util.messagebus.common.SubscriptionHolder;
import dorkbox.util.messagebus.common.SuperClassIterator;
import dorkbox.util.messagebus.listener.MessageHandler; import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader; import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
@ -61,6 +59,7 @@ public class SubscriptionManager {
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerListener; private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerListener;
private final Map<Class<?>, Class<?>> arrayVersionCache; private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final Map<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache; private final Map<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
@ -71,6 +70,7 @@ public class SubscriptionManager {
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSubscriptions; private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSubscriptions;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptions; private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptionsMulti;
// stripe size of maps for concurrency // stripe size of maps for concurrency
private final int STRIPE_SIZE; private final int STRIPE_SIZE;
@ -96,6 +96,7 @@ public class SubscriptionManager {
// modified by N threads // modified by N threads
{ {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
@ -107,6 +108,7 @@ public class SubscriptionManager {
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
} }
this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1);
@ -121,11 +123,10 @@ public class SubscriptionManager {
this.subscriptionsPerListener.clear(); this.subscriptionsPerListener.clear();
this.superClassesCache.clear(); this.superClassesCache.clear();
this.superClassSubscriptions.clear();
this.arrayVersionCache.clear(); this.arrayVersionCache.clear();
this.varArgSubscriptions.clear(); this.isArrayCache.clear();
this.varArgSuperClassSubscriptions.clear();
clearConcurrentCollections();
} }
public void subscribe(Object listener) { public void subscribe(Object listener) {
@ -141,9 +142,7 @@ public class SubscriptionManager {
} }
// these are concurrent collections // these are concurrent collections
this.superClassSubscriptions.clear(); clearConcurrentCollections();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
@ -170,7 +169,7 @@ public class SubscriptionManager {
messageHandler = current.getValue(); messageHandler = current.getValue();
current = current.next(); current = current.next();
Collection<Subscription> subsPerType = null; StrongConcurrentSetV8<Subscription> subsPerType = null;
// now add this subscription to each of the handled types // now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages(); Class<?>[] types = messageHandler.getHandledMessages();
@ -183,44 +182,44 @@ public class SubscriptionManager {
} else { } else {
subsPerType = this.subHolderConcurrent.get(); subsPerType = this.subHolderConcurrent.get();
this.subHolderConcurrent.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE)); this.subHolderConcurrent.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE));
getSuperClass(types[0]); getSuperClasses(types[0]);
} }
break; break;
} }
case 2: { case 2: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
getSuperClass(types[0]); getSuperClasses(types[0]);
getSuperClass(types[1]); getSuperClasses(types[1]);
} }
break; break;
} }
case 3: { case 3: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
getSuperClass(types[0]); getSuperClasses(types[0]);
getSuperClass(types[1]); getSuperClasses(types[1]);
getSuperClass(types[2]); getSuperClasses(types[2]);
} }
break; break;
} }
default: { default: {
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = this.subHolder.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1));
for (Class<?> c : types) { for (Class<?> c : types) {
getSuperClass(c); getSuperClasses(c);
} }
} }
break; break;
@ -263,9 +262,8 @@ public class SubscriptionManager {
} }
// these are concurrent collections // these are concurrent collections
this.superClassSubscriptions.clear(); clearConcurrentCollections();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
StrongConcurrentSetV8<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass); StrongConcurrentSetV8<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) { if (subscriptions != null) {
@ -280,12 +278,19 @@ public class SubscriptionManager {
} }
} }
private void clearConcurrentCollections() {
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
this.varArgSuperClassSubscriptionsMulti.clear();
}
/** /**
* race conditions will result in duplicate answers, which we don't care if happens * race conditions will result in duplicate answers, which we don't care if happens
* never returns null * never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/ */
private StrongConcurrentSetV8<Class<?>> getSuperClass(Class<?> clazz) { private StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz) {
// this is never reset, since it never needs to be. // this is never reset, since it never needs to be.
Map<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache; Map<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache;
@ -311,6 +316,7 @@ public class SubscriptionManager {
// race conditions will result in duplicate answers, which we don't care if happens // race conditions will result in duplicate answers, which we don't care if happens
local.put(clazz, set); local.put(clazz, set);
return set;
} }
return superTypes; return superTypes;
@ -333,6 +339,21 @@ public class SubscriptionManager {
return clazz; return clazz;
} }
/**
* Cache the values of JNI method, isArray(c)
* @return true if the class c is an array type
*/
@SuppressWarnings("boxing")
public final boolean isArray(Class<?> c) {
Boolean isArray = this.isArrayCache.get(c);
if (isArray == null) {
boolean b = c.isArray();
this.isArrayCache.put(c, b);
return b;
}
return isArray;
}
// CAN RETURN NULL // CAN RETURN NULL
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
@ -376,7 +397,6 @@ public class SubscriptionManager {
StrongConcurrentSetV8<Subscription> subs = local2.get(arrayVersion); StrongConcurrentSetV8<Subscription> subs = local2.get(arrayVersion);
if (subs != null && !subs.isEmpty()) { if (subs != null && !subs.isEmpty()) {
current = subs.head; current = subs.head;
while (current != null) { while (current != null) {
sub = current.getValue(); sub = current.getValue();
@ -396,7 +416,7 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
// CAN RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) { public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
@ -408,7 +428,7 @@ public class SubscriptionManager {
if (subsPerType == null) { if (subsPerType == null) {
// this caches our array type. This is never cleared. // this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType); Class<?> arrayVersion = getArrayClass(varArgType);
StrongConcurrentSetV8<Class<?>> types = getSuperClass(arrayVersion); StrongConcurrentSetV8<Class<?>> types = getSuperClasses(arrayVersion);
if (types.isEmpty()) { if (types.isEmpty()) {
local.put(varArgType, EMPTY_SUBS); local.put(varArgType, EMPTY_SUBS);
return null; return null;
@ -430,7 +450,6 @@ public class SubscriptionManager {
StrongConcurrentSetV8<Subscription> subs = local2.get(superClass); StrongConcurrentSetV8<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) { if (subs != null && !subs.isEmpty()) {
current = subs.head; current = subs.head;
while (current != null) { while (current != null) {
sub = current.getValue(); sub = current.getValue();
@ -451,6 +470,82 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
StrongConcurrentSetV8<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
StrongConcurrentSetV8<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
} else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
subsPerType = new StrongConcurrentSetV8<Subscription>(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ISetEntry<Subscription> current;
Subscription sub;
current = varargSuperSubscriptions1.head;
while (current != null) {
sub = current.getValue();
current = current.next();
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
}
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public final StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType) { public final StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType) {
@ -461,7 +556,7 @@ public class SubscriptionManager {
if (subsPerType == null) { if (subsPerType == null) {
// this caches our class hierarchy. This is never cleared. // this caches our class hierarchy. This is never cleared.
StrongConcurrentSetV8<Class<?>> types = getSuperClass(superType); StrongConcurrentSetV8<Class<?>> types = getSuperClasses(superType);
if (types.isEmpty()) { if (types.isEmpty()) {
local.put(superType, EMPTY_SUBS); local.put(superType, EMPTY_SUBS);
return null; return null;
@ -473,8 +568,9 @@ public class SubscriptionManager {
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
ISetEntry<Class<?>> current1; ISetEntry<Class<?>> current1 = null;
Class<?> superClass; Class<?> superClass;
current1 = types.head; current1 = types.head;
while (current1 != null) { while (current1 != null) {
superClass = current1.getValue(); superClass = current1.getValue();
@ -528,26 +624,19 @@ public class SubscriptionManager {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2;
// ISetEntry<Class<?>> current1; ISetEntry<Class<?>> current1 = null;
// Class<?> eventSuperType1;
//
// current1 = types1.head;
// while (current1 != null) {
// eventSuperType1 = current1.getValue();
// current1 = current1.next();
//
// }
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Class<?> eventSuperType1; Class<?> eventSuperType1;
ISetEntry<Class<?>> current2 = null;
Class<?> eventSuperType2; Class<?> eventSuperType2;
while (iterator1.hasNext()) { if (types1 != null) {
eventSuperType1 = iterator1.next(); current1 = types1.head;
}
while (current1 != null) {
eventSuperType1 = current1.getValue();
current1 = current1.next();
boolean type1Matches = eventSuperType1 == superType1; boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) { if (type1Matches) {
continue; continue;
@ -555,10 +644,13 @@ public class SubscriptionManager {
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) { if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2); if (types2 != null) {
current2 = types2.head;
}
while (current2 != null) {
eventSuperType2 = current2.getValue();
current2 = current2.next();
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) { if (type1Matches && eventSuperType2 == superType2) {
continue; continue;
} }
@ -604,27 +696,33 @@ public class SubscriptionManager {
// if the leaf exists, then the value exists. // if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue(); subsPerType = subsPerTypeLeaf.getValue();
} else { } else {
Collection<Class<?>> types1 = this.superClassesCache.get(superType1); StrongConcurrentSetV8<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2); StrongConcurrentSetV8<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Class<?>> types3 = this.superClassesCache.get(superType3); StrongConcurrentSetV8<Class<?>> types3 = this.superClassesCache.get(superType3);
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE); subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs; StrongConcurrentSetV8<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf3; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf3;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1); ISetEntry<Class<?>> current1 = null;
Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3;
Class<?> eventSuperType1; Class<?> eventSuperType1;
ISetEntry<Class<?>> current2 = null;
Class<?> eventSuperType2; Class<?> eventSuperType2;
ISetEntry<Class<?>> current3 = null;
Class<?> eventSuperType3; Class<?> eventSuperType3;
while (iterator1.hasNext()) { if (types1 != null) {
eventSuperType1 = iterator1.next(); current1 = types1.head;
}
while (current1 != null) {
eventSuperType1 = current1.getValue();
current1 = current1.next();
boolean type1Matches = eventSuperType1 == superType1; boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) { if (type1Matches) {
continue; continue;
@ -632,10 +730,13 @@ public class SubscriptionManager {
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) { if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2); if (types2 != null) {
current2 = types2.head;
}
while (current2 != null) {
eventSuperType2 = current2.getValue();
current2 = current2.next();
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2; boolean type12Matches = type1Matches && eventSuperType2 == superType2;
if (type12Matches) { if (type12Matches) {
continue; continue;
@ -644,10 +745,13 @@ public class SubscriptionManager {
leaf2 = leaf1.getLeaf(eventSuperType2); leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) { if (leaf2 != null) {
iterator3 = new SuperClassIterator(superType3, types3); if (types3 != null) {
current3 = types3.head;
}
while (current3 != null) {
eventSuperType3 = current3.getValue();
current3 = current3.next();
while (iterator3.hasNext()) {
eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) { if (type12Matches && eventSuperType3 == superType3) {
continue; continue;
} }
@ -679,4 +783,6 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
} }

View File

@ -1,54 +0,0 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.Iterator;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
public class SuperClassIterator implements Iterator<Class<?>> {
private final Iterator<Class<?>> iterator;
private Class<?> clazz;
public SuperClassIterator(Class<?> clazz, Collection<Class<?>> types) {
this.clazz = clazz;
if (types != null) {
this.iterator = types.iterator();
} else {
this.iterator = null;
}
}
@Override
public boolean hasNext() {
if (this.clazz != null) {
return true;
}
if (this.iterator != null) {
return this.iterator.hasNext();
}
return false;
}
@Override
public Class<?> next() {
if (this.clazz != null) {
Class<?> clazz2 = this.clazz;
this.clazz = null;
return clazz2;
}
if (this.iterator != null) {
return this.iterator.next();
}
return null;
}
@Override
public void remove() {
throw new NotImplementedException();
}
}

View File

@ -92,6 +92,10 @@ public class Subscription {
return this.listeners.size(); return this.listeners.size();
} }
public int count =0;
public int getCount() {
return count;
}
/** /**
* @return true if there were listeners for this publication, false if there was nothing * @return true if there were listeners for this publication, false if there was nothing
*/ */
@ -109,43 +113,43 @@ public class Subscription {
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
count++;
try { // try {
invocation.invoke(listener, handler, handleIndex, message); // invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) { // } catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"The class or method is not accessible") // "The class or method is not accessible")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (IllegalArgumentException e) { // } catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass() // "Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0]) // + "Expected: " + handler.getParameterTypes()[0])
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (InvocationTargetException e) { // } catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"Message handler threw exception") // "Message handler threw exception")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (Throwable e) { // } catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"The handler code threw an exception") // "The handler code threw an exception")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} // }
} }
return true; return true;
} }

View File

@ -42,7 +42,7 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish(1, 2, "s"); bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(12, count.get()); assertEquals(13, count.get());
count.set(0); count.set(0);
@ -52,7 +52,17 @@ public class MultiMessageTest extends MessageBusTest {
bus.publishAsync(1, 2, "s"); bus.publishAsync(1, 2, "s");
bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6}); bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(12, count.get()); while (bus.hasPendingMessages()) {
try {
Thread.sleep(ConcurrentUnits);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
assertEquals(13, count.get());
bus.shutdown(); bus.shutdown();

View File

@ -20,8 +20,6 @@ public class PerformanceTest {
public static final int CONCURRENCY_LEVEL = 2; public static final int CONCURRENCY_LEVEL = 2;
private static long count = 0;
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override @Override
public void handleError(PublicationError error) { public void handleError(PublicationError error) {
@ -42,15 +40,14 @@ public class PerformanceTest {
ConcurrentExecutor.runConcurrent(new Runnable() { ConcurrentExecutor.runConcurrent(new Runnable() {
@Override @Override
public void run() { public void run() {
long num = 0; Long num = Long.valueOf(7L);
while (num < Long.MAX_VALUE) { while (true) {
bus.publishAsync(num++); bus.publishAsync(num);
} }
}}, CONCURRENCY_LEVEL); }}, CONCURRENCY_LEVEL);
bus.shutdown(); bus.shutdown();
System.err.println("Count: " + count);
} }
public PerformanceTest() { public PerformanceTest() {
@ -61,7 +58,6 @@ public class PerformanceTest {
@Handler @Handler
public void handleSync(Long o1) { public void handleSync(Long o1) {
// System.err.println(Long.toString(o1)); // System.err.println(Long.toString(o1));
count++;
} }
} }
} }