Added back var arg 2/3, vararg super 2/3. Added unit test for this.

This commit is contained in:
nathan 2015-06-07 23:25:14 +02:00
parent 56728b327d
commit 1287612685
15 changed files with 475 additions and 429 deletions

View File

@ -4,7 +4,11 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
/**
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.
* A message publication starts when an object is send to the bus using one of the its publication methods.
* <p/>
*
* Because the message bus keeps track of classes that are subscribed and published, reloading the classloader means that you will need to
* SHUTDOWN the messagebus when you unload the classloader, and then re-subscribe relevant classes when you reload the classes.
* <p/>
*
* Messages can be published synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* Message handlers can be invoked synchronously or asynchronously depending on their configuration. Thus, there

View File

@ -0,0 +1,183 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
public final class ClassUtils {
private final Map<Class<?>, Class<?>> arrayCache;
private final Map<Class<?>, Class<?>[]> superClassesCache;
public ClassUtils(final float loadFactor) {
this.arrayCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, 1);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 1);
}
/**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
* <p>
* if parameter clazz is of type array, then the super classes are of array type as well
* <p>
* protected by read lock by caller. The cache version is called first, by write lock
*/
public Class<?>[] getSuperClasses(final Class<?> clazz) {
// this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
if (classes == null) {
// publish all super types of class
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
final int length = superTypes.length;
final ArrayList<Class<?>> newList = new ArrayList<Class<?>>(length);
Class<?> c;
final boolean isArray = clazz.isArray();
if (isArray) {
for (int i = 0; i < length; i++) {
c = superTypes[i];
c = getArrayClass(c);
if (c != clazz) {
newList.add(c);
}
}
}
else {
for (int i = 0; i < length; i++) {
c = superTypes[i];
if (c != clazz) {
newList.add(c);
}
}
}
classes = new Class<?>[newList.size()];
newList.toArray(classes);
local.put(clazz, classes);
}
return classes;
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> arrayCache = this.arrayCache;
Class<?> clazz = arrayCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
arrayCache.put(c, clazz);
}
return clazz;
}
/**
* Clears the caches
*/
public void clear() {
this.arrayCache.clear();
this.superClassesCache.clear();
}
public static <T> ArrayList<T> findCommon(final T[] arrayOne, final T[] arrayTwo) {
T[] arrayToHash;
T[] arrayToSearch;
final int size1 = arrayOne.length;
final int size2 = arrayTwo.length;
final int hashSize;
final int searchSize;
if (size1 < size2) {
hashSize = size1;
searchSize = size2;
arrayToHash = arrayOne;
arrayToSearch = arrayTwo;
}
else {
hashSize = size2;
searchSize = size1;
arrayToHash = arrayTwo;
arrayToSearch = arrayOne;
}
final ArrayList<T> intersection = new ArrayList<T>(searchSize);
final HashSet<T> hashedArray = new HashSet<T>();
for (int i = 0; i < hashSize; i++) {
T t = arrayToHash[i];
hashedArray.add(t);
}
for (int i = 0; i < searchSize; i++) {
T t = arrayToSearch[i];
if (hashedArray.contains(t)) {
intersection.add(t);
}
}
return intersection;
}
public static <T> ArrayList<T> findCommon(final ArrayList<T> arrayOne, final ArrayList<T> arrayTwo) {
ArrayList<T> arrayToHash;
ArrayList<T> arrayToSearch;
final int size1 = arrayOne.size();
final int size2 = arrayTwo.size();
final int hashSize;
final int searchSize;
if (size1 < size2) {
hashSize = size1;
searchSize = size2;
arrayToHash = arrayOne;
arrayToSearch = arrayTwo;
}
else {
hashSize = size2;
searchSize = size1;
arrayToHash = arrayTwo;
arrayToSearch = arrayOne;
}
ArrayList<T> intersection = new ArrayList<T>(searchSize);
HashSet<T> hashedArray = new HashSet<T>();
for (int i = 0; i < hashSize; i++) {
T t = arrayToHash.get(i);
hashedArray.add(t);
}
for (int i = 0; i < searchSize; i++) {
T t = arrayToSearch.get(i);
if (hashedArray.contains(t)) {
intersection.add(t);
}
}
return intersection;
}
}

View File

@ -328,7 +328,7 @@ public class HashMapTree<KEY, VALUE> {
return tree.value;
}
public final VALUE getValue(KEY key1, KEY key2, KEY key3) {
public final VALUE get(KEY key1, KEY key2, KEY key3) {
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf(key1);

View File

@ -74,7 +74,7 @@ public class MessageHandler {
private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes;
private final boolean acceptsVarArgs;
private final Class<?> varArgClass;
private final boolean isSynchronized;
@ -94,7 +94,12 @@ public class MessageHandler {
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
this.acceptsVarArgs = handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs();
if (handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs()) {
this.varArgClass = handledMessages[0].getComponentType();
}
else {
this.varArgClass = null;
}
}
public final boolean isSynchronized() {
@ -113,12 +118,16 @@ public class MessageHandler {
return this.handledMessages;
}
public final Class<?> getVarArgClass() {
return this.varArgClass;
}
public final boolean acceptsSubtypes() {
return this.acceptsSubtypes;
}
public final boolean acceptsVarArgs() {
return this.acceptsVarArgs;
return this.varArgClass != null;
}
@Override

View File

@ -1,96 +0,0 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
public class SuperClassUtils {
private final Map<Class<?>, Class<?>> versionCache;
private final Map<Class<?>, Class<?>[]> superClassesCache;
public SuperClassUtils(float loadFactor, int stripeSize) {
this.versionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, stripeSize);
}
/**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
* <p>
* if parameter clazz is of type array, then the super classes are of array type as well
* <p>
* protected by read lock by caller. The cache version is called first, by write lock
*/
public final Class<?>[] getSuperClasses(final Class<?> clazz) {
// this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
if (classes == null) {
// publish all super types of class
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
final int length = superTypes.length;
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(length);
Class<?> c;
final boolean isArray = clazz.isArray();
if (isArray) {
for (int i = 0; i < length; i++) {
c = superTypes[i];
c = getArrayClass(c);
if (c != clazz) {
newList.add(c);
}
}
}
else {
for (int i = 0; i < length; i++) {
c = superTypes[i];
if (c != clazz) {
newList.add(c);
}
}
}
classes = new Class<?>[newList.size()];
newList.toArray(classes);
local.put(clazz, classes);
}
return classes;
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public final Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> versionCache = this.versionCache;
Class<?> clazz = versionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
versionCache.put(c, clazz);
}
return clazz;
}
/**
* Clears the caches on shutdown
*/
public final void shutdown() {
this.versionCache.clear();
this.superClassesCache.clear();
}
}

View File

@ -1,57 +1,50 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionUtils;
import java.util.ArrayList;
import java.util.Map;
public final class VarArgUtils {
private final Map<Class<?>, ArrayList<Subscription>> varArgSubscriptions;
private final Map<Class<?>, ArrayList<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, ArrayList<Subscription>> varArgSuperClassSubscriptionsMulti;
private final Map<Class<?>, ArrayList<Subscription>> varArgSubscriptionsSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> varArgSubscriptionsMulti;
private final SubscriptionHolder subHolderConcurrent;
private final Map<Class<?>, ArrayList<Subscription>> varArgSuperSubscriptionsSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> varArgSuperSubscriptionsMulti;
private final float loadFactor;
private final int stripeSize;
private final SubscriptionUtils utils;
private final SuperClassUtils superClassUtils;
private final ClassUtils superClassUtils;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils,
Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) {
public VarArgUtils(final ClassUtils superClassUtils, final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
final float loadFactor, final int stripeSize) {
this.utils = utils;
this.superClassUtils = superClassUtils;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.loadFactor = loadFactor;
this.stripeSize = stripeSize;
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.varArgSubscriptionsSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.subHolderConcurrent = new SubscriptionHolder();
this.varArgSuperSubscriptionsSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
}
public void clear() {
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
this.varArgSuperClassSubscriptionsMulti.clear();
this.varArgSubscriptionsSingle.clear();
this.varArgSubscriptionsMulti.clear();
this.varArgSuperSubscriptionsSingle.clear();
this.varArgSuperSubscriptionsMulti.clear();
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Subscription[] getVarArgSubscriptions(Class<?> messageClass) {
public Subscription[] getVarArgSubscriptions(final Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions;
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptionsSingle;
ArrayList<Subscription> varArgSubs = local.get(messageClass);
@ -68,7 +61,7 @@ public final class VarArgUtils {
for (int i = 0; i < length; i++) {
sub = subs.get(i);
if (sub.acceptsVarArgs()) {
if (sub.getHandler().acceptsVarArgs()) {
varArgSubs.add(sub);
}
}
@ -84,6 +77,7 @@ public final class VarArgUtils {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
@ -95,9 +89,10 @@ public final class VarArgUtils {
return subscriptions;
}
// CAN NOT RETURN NULL
private ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptions;
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsSingle;
ArrayList<Subscription> varArgSuperSubs = local.get(messageClass);
@ -119,6 +114,7 @@ public final class VarArgUtils {
Subscription sub;
ArrayList<Subscription> subs;
int length;
MessageHandler handlerMetadata;
for (int i = 0; i < typesLength; i++) {
type = types[i];
@ -131,7 +127,8 @@ public final class VarArgUtils {
for (int j = 0; j < length; j++) {
sub = subs.get(j);
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
handlerMetadata = sub.getHandler();
if (handlerMetadata.acceptsSubtypes() && handlerMetadata.acceptsVarArgs()) {
varArgSuperSubs.add(sub);
}
}
@ -151,42 +148,24 @@ public final class VarArgUtils {
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
ArrayList<Subscription> subsPerType;
ArrayList<Subscription> subs = local.get(messageClass1, messageClass2);
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
}
else {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final int size1 = varargSuperSubscriptions1.size();
final int size2 = varargSuperSubscriptions2.size();
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
subsPerType = new ArrayList<Subscription>(size1 + size2);
Subscription sub;
for (int i = 0; i < size1; i++) {
sub = varargSuperSubscriptions1.get(i);
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
}
subsPerType.trimToSize();
local.put(subsPerType, messageClass1, messageClass2);
subs.trimToSize();
local.put(subs, messageClass1, messageClass2);
}
final Subscription[] subscriptions = new Subscription[subsPerType.size()];
subsPerType.toArray(subscriptions);
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
}
@ -197,43 +176,28 @@ public final class VarArgUtils {
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
ArrayList<Subscription> subsPerType;
ArrayList<Subscription> subs = local.get(messageClass1, messageClass2, messageClass3);
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
}
else {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3);
final int size1 = varargSuperSubscriptions1.size();
final int size2 = varargSuperSubscriptions2.size();
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3);
subsPerType = new ArrayList<Subscription>(size1 + size2);
Subscription sub;
for (int i = 0; i < size1; i++) {
sub = varargSuperSubscriptions1.get(i);
if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
subsPerType.add(sub);
}
}
subsPerType.trimToSize();
local.put(subsPerType, messageClass1, messageClass2, messageClass3);
subs.trimToSize();
local.put(subs, messageClass1, messageClass2, messageClass3);
}
final Subscription[] subscriptions = new Subscription[subsPerType.size()];
subsPerType.toArray(subscriptions);
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
}
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author dorkbox, llc
* Date: 2/2/15
*/
public class Subscription {
public final class Subscription {
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
public final int ID = ID_COUNTER.getAndIncrement();
@ -56,43 +56,31 @@ public class Subscription {
this.invocation = invocation;
}
public final MessageHandler getHandlerMetadata() {
public MessageHandler getHandler() {
return handlerMetadata;
}
public final Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
public final boolean acceptsSubtypes() {
return this.handlerMetadata.acceptsSubtypes();
}
public final boolean acceptsVarArgs() {
return this.handlerMetadata.acceptsVarArgs();
}
public final boolean isEmpty() {
public boolean isEmpty() {
return this.listeners.isEmpty();
}
public final void subscribe(Object listener) {
public void subscribe(Object listener) {
this.listeners.add(listener);
}
/**
* @return TRUE if the element was removed
*/
public final boolean unsubscribe(Object existingListener) {
public boolean unsubscribe(Object existingListener) {
return this.listeners.remove(existingListener);
}
// only used in unit-test
public final int size() {
public int size() {
return this.listeners.size();
}
public final void publish(final Object message) throws Throwable {
public void publish(final Object message) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
@ -107,7 +95,7 @@ public class Subscription {
}
}
public final void publish(final Object message1, final Object message2) throws Throwable {
public void publish(final Object message1, final Object message2) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
@ -122,7 +110,7 @@ public class Subscription {
}
}
public final void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
@ -137,7 +125,7 @@ public class Subscription {
}
}
public final void publishToSubscription(final Object... messages) throws Throwable {
public void publishToSubscription(final Object... messages) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
@ -154,12 +142,12 @@ public class Subscription {
@Override
public final int hashCode() {
public int hashCode() {
return this.ID;
}
@Override
public final boolean equals(Object obj) {
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
@ -233,7 +221,6 @@ public class Subscription {
}
subs.add(this);
return;
}
}
}

View File

@ -29,7 +29,6 @@ import java.util.concurrent.locks.StampedLock;
public final class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
private final SubscriptionUtils utils;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
@ -49,7 +48,8 @@ public final class SubscriptionManager {
// once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final ClassUtils classUtils;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock();
@ -69,14 +69,14 @@ public final class SubscriptionManager {
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(32, LOAD_FACTOR, 1);
}
final SuperClassUtils superClass = new SuperClassUtils(LOAD_FACTOR, 1);
classUtils = new ClassUtils(LOAD_FACTOR);
this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, LOAD_FACTOR,
numberOfThreads);
this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti,
LOAD_FACTOR, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
}
public void shutdown() {
@ -88,7 +88,7 @@ public final class SubscriptionManager {
clearConcurrentCollections();
this.utils.shutdown();
this.classUtils.clear();
}
public void subscribe(final Object listener) {
@ -144,7 +144,6 @@ public final class SubscriptionManager {
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
final SubscriptionUtils utils = this.utils;
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
@ -211,7 +210,7 @@ public final class SubscriptionManager {
private void clearConcurrentCollections() {
this.utils.clear();
this.subUtils.clear();
this.varArgUtils.clear();
}
@ -326,7 +325,7 @@ public final class SubscriptionManager {
ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
@ -355,7 +354,7 @@ public final class SubscriptionManager {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.utils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
@ -386,7 +385,7 @@ public final class SubscriptionManager {
.get(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.utils
final ArrayList<Subscription> superSubs = this.subUtils
.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // NOT return null
if (collection != null) {
@ -594,80 +593,79 @@ public final class SubscriptionManager {
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
// publish to var arg, only if not already an array
if (varArgPossibility.get() && !isArray) {
stamp = lock.readLock();
final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass); // can return null
lock.unlockRead(stamp);
if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
}
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub.publish(asArray);
}
// publish to var arg, only if not already an array (because that would be unnecessary)
if (varArgPossibility.get() && !isArray) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
lock.unlockRead(stamp);
Subscription sub;
int length = varArgSubs.length;
Object[] asArray = null;
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
else {
stamp = lock.readLock();
if (length > 1) {
hasSubs = true;
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);
lock.unlockRead(stamp);
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);// CAN NOT RETURN NULL
lock.unlockRead(stamp);
length = varArgSuperSubs.length;
if (length > 1) {
hasSubs = true;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
}
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
sub.publish(asArray);
}
}
return;
}
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
if (!hasSubs) {
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
@ -676,67 +674,77 @@ public final class SubscriptionManager {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && messageClass1 == messageClass2 && !messageClass1.isArray()) {
long stamp = lock.readLock();
final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray()) {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub = varArgSubs[i];
sub.publish(asArray);
}
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
else {
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils
.getVarArgSuperSubscriptions(messageClass1, messageClass2); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 2);
asArray[0] = message1;
asArray[1] = message2;
sub.publish(asArray);
}
}
}
else {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
@ -756,69 +764,81 @@ public final class SubscriptionManager {
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subs = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
if (subs != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
for (int i = 0; i < subs.length; i++) {
sub = subs[i];
sub.publish(message1, message2, message3);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) {
long stamp = lock.readLock();
final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray() && !messageClass3.isArray()) {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub = varArgSubs[i];
sub.publish(asArray);
}
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
else {
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils
.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
sub.publish(asArray);
}
}
}
else {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {

View File

@ -1,8 +1,8 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.ClassUtils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.SuperClassUtils;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
@ -10,7 +10,7 @@ import java.util.ArrayList;
import java.util.Map;
public final class SubscriptionUtils {
private final SuperClassUtils superClass;
private final ClassUtils superClass;
private final ClassHolder classHolderSingle;
@ -28,9 +28,9 @@ public final class SubscriptionUtils {
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
public SubscriptionUtils(SuperClassUtils superClass, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, float loadFactor,
int stripeSize) {
public SubscriptionUtils(final ClassUtils superClass, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, final float loadFactor,
final int stripeSize) {
this.superClass = superClass;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
@ -50,11 +50,7 @@ public final class SubscriptionUtils {
public void clear() {
this.superClassSubscriptions.clear();
}
public void shutdown() {
this.superClass.shutdown();
this.superClassSubscriptionsMulti.clear();
}
@ -71,9 +67,9 @@ public final class SubscriptionUtils {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
ArrayList<Subscription> superSubscriptions = local.get(clazz);
ArrayList<Subscription> subs = local.get(clazz);
if (superSubscriptions == null) {
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
@ -86,7 +82,7 @@ public final class SubscriptionUtils {
final int length = superClasses.length;
int superSubLength;
superSubscriptions = new ArrayList<Subscription>(length);
subs = new ArrayList<Subscription>(length);
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
@ -97,18 +93,18 @@ public final class SubscriptionUtils {
for (int j = 0; j < superSubLength; j++) {
sub = superSubs.get(j);
if (sub.acceptsSubtypes()) {
superSubscriptions.add(sub);
if (sub.getHandler().acceptsSubtypes()) {
subs.add(sub);
}
}
}
}
superSubscriptions.trimToSize();
local.put(clazz, superSubscriptions);
subs.trimToSize();
local.put(clazz, subs);
}
return superSubscriptions;
return subs;
}
/**
@ -122,12 +118,11 @@ public final class SubscriptionUtils {
*/
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2) {
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
HashMapTree<Class<?>, ArrayList<Subscription>> subsLeaf = local.getLeaf(clazz1, clazz2);
ArrayList<Subscription> subs;
ArrayList<Subscription> subs = local.get(clazz1, clazz2);
if (subsLeaf == null) {
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final HashMapTree<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageMulti;
@ -166,7 +161,7 @@ public final class SubscriptionUtils {
for (int k = 0; k < superSubs.size(); k++) {
sub = superSubs.get(k);
if (sub.acceptsSubtypes()) {
if (sub.getHandler().acceptsSubtypes()) {
subs.add(sub);
}
}
@ -176,10 +171,6 @@ public final class SubscriptionUtils {
subs.trimToSize();
local.put(subs, clazz1, clazz2);
}
else {
// if the leaf exists, then the value exists.
subs = subsLeaf.getValue();
}
return subs;
}
@ -193,14 +184,13 @@ public final class SubscriptionUtils {
*
* @return CAN NOT RETURN NULL
*/
public ArrayList<Subscription> getSuperSubscriptions(Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Class<?> clazz3) {
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
HashMapTree<Class<?>, ArrayList<Subscription>> subsLeaf = local.getLeaf(clazz1, clazz2, clazz3);
ArrayList<Subscription> subs;
ArrayList<Subscription> subs = local.get(clazz1, clazz2, clazz3);
if (subsLeaf == null) {
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final HashMapTree<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageMulti;
@ -250,7 +240,7 @@ public final class SubscriptionUtils {
for (int m = 0; m < superSubs.size(); m++) {
sub = superSubs.get(m);
if (sub.acceptsSubtypes()) {
if (sub.getHandler().acceptsSubtypes()) {
subs.add(sub);
}
}
@ -261,10 +251,6 @@ public final class SubscriptionUtils {
subs.trimToSize();
local.put(subs, clazz1, clazz2, clazz3);
}
else {
// if the leaf exists, then the value exists.
subs = subsLeaf.getValue();
}
return subs;
}

View File

@ -1,15 +1,12 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
*
* @author bennidi
@ -21,6 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
public void testSingleThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MultiMBassador();
fifoBUs.start();
List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){

View File

@ -1,15 +1,6 @@
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
import dorkbox.util.messagebus.common.MessageManager;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.listeners.ExceptionThrowingListener;
@ -19,6 +10,9 @@ import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
@ -108,6 +102,7 @@ public class MBassadorTest extends MessageBusTest {
final MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(ExceptionCounter);
bus.start();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
@ -134,8 +129,8 @@ public class MBassadorTest extends MessageBusTest {
while (bus.hasPendingMessages()) {
pause(10);
}
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
bus.shutdown();
}

View File

@ -20,6 +20,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSending(){
IMessageBus bus = new MultiMBassador();
bus.start();
Listener listener1 = new Listener();
bus.subscribe(listener1);
@ -39,11 +40,11 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish("s"); // 4
bus.publish("s", "s"); // 3
bus.publish("s", "s", "s"); // 3
bus.publish(1, "s");
bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
bus.publish(1, "s"); // 1
bus.publish(1, 2, "s"); // 2
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2
assertEquals(13, count.get());
assertEquals(15, count.get());
count.set(0);
@ -58,9 +59,6 @@ public class MultiMessageTest extends MessageBusTest {
try {
Thread.sleep(ConcurrentUnits);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}

View File

@ -15,10 +15,9 @@
*/
package dorkbox.util.messagebus;
import org.junit.Test;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.HashMapTree;
import org.junit.Test;
public class ObjectTreeTest extends AssertSupport {
@ -34,7 +33,7 @@ public class ObjectTreeTest extends AssertSupport {
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
tree.put(string, clazz1, clazz2, clazz3);
assertEquals(string, tree.getValue(clazz1, clazz2, clazz3));
assertEquals(string, tree.get(clazz1, clazz2, clazz3));
}
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?>... clazzes) {

View File

@ -1,12 +1,5 @@
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
@ -19,6 +12,10 @@ import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
@ -32,6 +29,7 @@ public class SyncBusTest extends MessageBusTest {
public void testSynchronousMessagePublication() throws Exception {
final IMessageBus bus = new MultiMBassador();
bus.start();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -86,6 +84,7 @@ public class SyncBusTest extends MessageBusTest {
final IMessageBus bus = new MultiMBassador();
bus.addErrorHandler(ExceptionCounter);
bus.start();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -1,13 +1,11 @@
package dorkbox.util.messagebus.common;
import junit.framework.Assert;
import org.junit.Before;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.messages.MessageTypes;
import junit.framework.Assert;
import org.junit.Before;
/**
* A base test that provides a factory for message bus that makes tests fail if any
@ -44,6 +42,7 @@ public abstract class MessageBusTest extends AssertSupport {
public MultiMBassador createBus() {
MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(TestFailingHandler);
bus.start();
return bus;
}
@ -51,6 +50,7 @@ public abstract class MessageBusTest extends AssertSupport {
MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
bus.start();
return bus;
}
}