diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 43434fe..aae8f41 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -58,7 +58,7 @@ public class MultiMBassador implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MultiMBassador(int numberOfThreads) { - this(false, numberOfThreads); + this(true, numberOfThreads); } /** @@ -216,7 +216,7 @@ public class MultiMBassador implements IMessageBus { // publish to var arg, only if not already an array - if (!messageClass.isArray()) { + if (!manager.isArray(messageClass)) { Object[] asArray = null; StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass); @@ -312,7 +312,7 @@ public class MultiMBassador implements IMessageBus { } // publish to var arg, only if not already an array - if (messageClass1 == messageClass2 && !messageClass1.isArray()) { + if (messageClass1 == messageClass2) { Object[] asArray = null; StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); @@ -345,6 +345,27 @@ public class MultiMBassador implements IMessageBus { sub = current.getValue(); current = current.next(); + // this catches all exception types + subsPublished |= sub.publishToSubscription(this, asArray); + } + } + } else { + StrongConcurrentSetV8 varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); + + // now get array based superClasses (but only if those ALSO accept vararg) + if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { + current = varargSuperMultiSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + + // since each sub will be for the "lowest common demoninator", we have to re-create + // this array from the componentType every time -- since it will be different + Class componentType = sub.getHandledMessageTypes()[0].getComponentType(); + Object[] asArray = (Object[]) Array.newInstance(componentType, 2); + asArray[0] = message1; + asArray[1] = message2; + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } @@ -412,8 +433,8 @@ public class MultiMBassador implements IMessageBus { } } - // publish to var arg, only if not already an array - if (messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) { + // publish to var arg, only if not already an array, and all of the same type + if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { Object[] asArray = null; StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { @@ -447,6 +468,28 @@ public class MultiMBassador implements IMessageBus { sub = current.getValue(); current = current.next(); + // this catches all exception types + subsPublished |= sub.publishToSubscription(this, asArray); + } + } + } else { + StrongConcurrentSetV8 varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); + + // now get array based superClasses (but only if those ALSO accept vararg) + if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { + current = varargSuperMultiSubscriptions.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + + // since each sub will be for the "lowest common demoninator", we have to re-create + // this array from the componentType every time -- since it will be different + Class componentType = sub.getHandledMessageTypes()[0].getComponentType(); + Object[] asArray = (Object[]) Array.newInstance(componentType, 3); + asArray[0] = message1; + asArray[1] = message2; + asArray[2] = message3; + // this catches all exception types subsPublished |= sub.publishToSubscription(this, asArray); } diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 5275d9b..2cb8231 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -2,7 +2,6 @@ package dorkbox.util.messagebus; import java.lang.reflect.Array; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -12,7 +11,6 @@ import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ReflectionUtils; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.SubscriptionHolder; -import dorkbox.util.messagebus.common.SuperClassIterator; import dorkbox.util.messagebus.listener.MessageHandler; import dorkbox.util.messagebus.listener.MetadataReader; import dorkbox.util.messagebus.subscription.Subscription; @@ -61,6 +59,7 @@ public class SubscriptionManager { private final ConcurrentMap, StrongConcurrentSetV8> subscriptionsPerListener; private final Map, Class> arrayVersionCache; + private final Map, Boolean> isArrayCache; private final Map, StrongConcurrentSetV8>> superClassesCache; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. @@ -71,6 +70,7 @@ public class SubscriptionManager { private final Map, StrongConcurrentSetV8> varArgSubscriptions; private final Map, StrongConcurrentSetV8> varArgSuperClassSubscriptions; + private final HashMapTree, StrongConcurrentSetV8> varArgSuperClassSubscriptionsMulti; // stripe size of maps for concurrency private final int STRIPE_SIZE; @@ -96,6 +96,7 @@ public class SubscriptionManager { // modified by N threads { this.arrayVersionCache = new ConcurrentHashMapV8, Class>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.isArrayCache = new ConcurrentHashMapV8, Boolean>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.superClassesCache = new ConcurrentHashMapV8, StrongConcurrentSetV8>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. @@ -107,6 +108,7 @@ public class SubscriptionManager { // it's a hit on SUB/UNSUB, but improves performance of handlers this.varArgSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.varArgSuperClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); } this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); @@ -121,11 +123,10 @@ public class SubscriptionManager { this.subscriptionsPerListener.clear(); this.superClassesCache.clear(); - this.superClassSubscriptions.clear(); - this.arrayVersionCache.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); + this.isArrayCache.clear(); + + clearConcurrentCollections(); } public void subscribe(Object listener) { @@ -141,9 +142,7 @@ public class SubscriptionManager { } // these are concurrent collections - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); + clearConcurrentCollections(); // no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go. @@ -170,7 +169,7 @@ public class SubscriptionManager { messageHandler = current.getValue(); current = current.next(); - Collection subsPerType = null; + StrongConcurrentSetV8 subsPerType = null; // now add this subscription to each of the handled types Class[] types = messageHandler.getHandledMessages(); @@ -183,44 +182,44 @@ public class SubscriptionManager { } else { subsPerType = this.subHolderConcurrent.get(); this.subHolderConcurrent.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE)); - getSuperClass(types[0]); + getSuperClasses(types[0]); } break; } case 2: { - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subsPerType = this.subHolder.get(); this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); - getSuperClass(types[0]); - getSuperClass(types[1]); + getSuperClasses(types[0]); + getSuperClasses(types[1]); } break; } case 3: { - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subsPerType = this.subHolder.get(); this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); - getSuperClass(types[0]); - getSuperClass(types[1]); - getSuperClass(types[2]); + getSuperClasses(types[0]); + getSuperClasses(types[1]); + getSuperClasses(types[2]); } break; } default: { - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subsPerType = this.subHolder.get(); this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); for (Class c : types) { - getSuperClass(c); + getSuperClasses(c); } } break; @@ -263,9 +262,8 @@ public class SubscriptionManager { } // these are concurrent collections - this.superClassSubscriptions.clear(); - this.varArgSubscriptions.clear(); - this.varArgSuperClassSubscriptions.clear(); + clearConcurrentCollections(); + StrongConcurrentSetV8 subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { @@ -280,12 +278,19 @@ public class SubscriptionManager { } } + private void clearConcurrentCollections() { + this.superClassSubscriptions.clear(); + this.varArgSubscriptions.clear(); + this.varArgSuperClassSubscriptions.clear(); + this.varArgSuperClassSubscriptionsMulti.clear(); + } + /** * race conditions will result in duplicate answers, which we don't care if happens * never returns null * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) */ - private StrongConcurrentSetV8> getSuperClass(Class clazz) { + private StrongConcurrentSetV8> getSuperClasses(Class clazz) { // this is never reset, since it never needs to be. Map, StrongConcurrentSetV8>> local = this.superClassesCache; @@ -311,6 +316,7 @@ public class SubscriptionManager { // race conditions will result in duplicate answers, which we don't care if happens local.put(clazz, set); + return set; } return superTypes; @@ -333,6 +339,21 @@ public class SubscriptionManager { return clazz; } + /** + * Cache the values of JNI method, isArray(c) + * @return true if the class c is an array type + */ + @SuppressWarnings("boxing") + public final boolean isArray(Class c) { + Boolean isArray = this.isArrayCache.get(c); + if (isArray == null) { + boolean b = c.isArray(); + this.isArrayCache.put(c, b); + return b; + } + return isArray; + } + // CAN RETURN NULL public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType) { @@ -376,7 +397,6 @@ public class SubscriptionManager { StrongConcurrentSetV8 subs = local2.get(arrayVersion); if (subs != null && !subs.isEmpty()) { - current = subs.head; while (current != null) { sub = current.getValue(); @@ -396,7 +416,7 @@ public class SubscriptionManager { return subsPerType; } - // CAN RETURN NULL + // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions public StrongConcurrentSetV8 getVarArgSuperSubscriptions(Class varArgType) { @@ -408,7 +428,7 @@ public class SubscriptionManager { if (subsPerType == null) { // this caches our array type. This is never cleared. Class arrayVersion = getArrayClass(varArgType); - StrongConcurrentSetV8> types = getSuperClass(arrayVersion); + StrongConcurrentSetV8> types = getSuperClasses(arrayVersion); if (types.isEmpty()) { local.put(varArgType, EMPTY_SUBS); return null; @@ -430,7 +450,6 @@ public class SubscriptionManager { StrongConcurrentSetV8 subs = local2.get(superClass); if (subs != null && !subs.isEmpty()) { - current = subs.head; while (current != null) { sub = current.getValue(); @@ -451,6 +470,82 @@ public class SubscriptionManager { return subsPerType; } + // CAN NOT RETURN NULL + // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI + // and then, returns the array'd version subscriptions + public StrongConcurrentSetV8 getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { + HashMapTree, StrongConcurrentSetV8> local = this.varArgSuperClassSubscriptionsMulti; + + // whenever our subscriptions change, this map is cleared. + HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); + StrongConcurrentSetV8 subsPerType = null; + + // we DO NOT care about duplicate, because the answers will be the same + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { + + + + + StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); + if (putIfAbsent != null) { + // someone beat us + subsPerType = putIfAbsent; + } + } + + return subsPerType; + } + + // CAN NOT RETURN NULL + // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI + // and then, returns the array'd version subscriptions + public StrongConcurrentSetV8 getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + HashMapTree, StrongConcurrentSetV8> local = this.varArgSuperClassSubscriptionsMulti; + + // whenever our subscriptions change, this map is cleared. + HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); + StrongConcurrentSetV8 subsPerType = null; + + // we DO NOT care about duplicate, because the answers will be the same + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { + // the message class types are not the same, so look for a common superClass varArg subscription. + // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages + StrongConcurrentSetV8 varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); + + subsPerType = new StrongConcurrentSetV8(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + + ISetEntry current; + Subscription sub; + + + + current = varargSuperSubscriptions1.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + + if (varargSuperSubscriptions2.contains(sub)) { + subsPerType.add(sub); + } + } + + StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); + if (putIfAbsent != null) { + // someone beat us + subsPerType = putIfAbsent; + } + } + + return subsPerType; + } + // ALSO checks to see if the superClass accepts subtypes. public final StrongConcurrentSetV8 getSuperSubscriptions(Class superType) { @@ -461,7 +556,7 @@ public class SubscriptionManager { if (subsPerType == null) { // this caches our class hierarchy. This is never cleared. - StrongConcurrentSetV8> types = getSuperClass(superType); + StrongConcurrentSetV8> types = getSuperClasses(superType); if (types.isEmpty()) { local.put(superType, EMPTY_SUBS); return null; @@ -473,8 +568,9 @@ public class SubscriptionManager { ISetEntry current; Subscription sub; - ISetEntry> current1; + ISetEntry> current1 = null; Class superClass; + current1 = types.head; while (current1 != null) { superClass = current1.getValue(); @@ -528,26 +624,19 @@ public class SubscriptionManager { HashMapTree, StrongConcurrentSetV8> leaf1; HashMapTree, StrongConcurrentSetV8> leaf2; -// ISetEntry> current1; -// Class eventSuperType1; -// -// current1 = types1.head; -// while (current1 != null) { -// eventSuperType1 = current1.getValue(); -// current1 = current1.next(); -// -// } - - - - Iterator> iterator1 = new SuperClassIterator(superType1, types1); - Iterator> iterator2; - + ISetEntry> current1 = null; Class eventSuperType1; + + ISetEntry> current2 = null; Class eventSuperType2; - while (iterator1.hasNext()) { - eventSuperType1 = iterator1.next(); + if (types1 != null) { + current1 = types1.head; + } + while (current1 != null) { + eventSuperType1 = current1.getValue(); + current1 = current1.next(); + boolean type1Matches = eventSuperType1 == superType1; if (type1Matches) { continue; @@ -555,10 +644,13 @@ public class SubscriptionManager { leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); if (leaf1 != null) { - iterator2 = new SuperClassIterator(superType2, types2); + if (types2 != null) { + current2 = types2.head; + } + while (current2 != null) { + eventSuperType2 = current2.getValue(); + current2 = current2.next(); - while (iterator2.hasNext()) { - eventSuperType2 = iterator2.next(); if (type1Matches && eventSuperType2 == superType2) { continue; } @@ -604,27 +696,33 @@ public class SubscriptionManager { // if the leaf exists, then the value exists. subsPerType = subsPerTypeLeaf.getValue(); } else { - Collection> types1 = this.superClassesCache.get(superType1); - Collection> types2 = this.superClassesCache.get(superType2); - Collection> types3 = this.superClassesCache.get(superType3); + StrongConcurrentSetV8> types1 = this.superClassesCache.get(superType1); + StrongConcurrentSetV8> types2 = this.superClassesCache.get(superType2); + StrongConcurrentSetV8> types3 = this.superClassesCache.get(superType3); subsPerType = new StrongConcurrentSetV8(16, LOAD_FACTOR, this.STRIPE_SIZE); - Collection subs; + StrongConcurrentSetV8 subs; HashMapTree, StrongConcurrentSetV8> leaf1; HashMapTree, StrongConcurrentSetV8> leaf2; HashMapTree, StrongConcurrentSetV8> leaf3; - Iterator> iterator1 = new SuperClassIterator(superType1, types1); - Iterator> iterator2; - Iterator> iterator3; - + ISetEntry> current1 = null; Class eventSuperType1; + + ISetEntry> current2 = null; Class eventSuperType2; + + ISetEntry> current3 = null; Class eventSuperType3; - while (iterator1.hasNext()) { - eventSuperType1 = iterator1.next(); + if (types1 != null) { + current1 = types1.head; + } + while (current1 != null) { + eventSuperType1 = current1.getValue(); + current1 = current1.next(); + boolean type1Matches = eventSuperType1 == superType1; if (type1Matches) { continue; @@ -632,10 +730,13 @@ public class SubscriptionManager { leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); if (leaf1 != null) { - iterator2 = new SuperClassIterator(superType2, types2); + if (types2 != null) { + current2 = types2.head; + } + while (current2 != null) { + eventSuperType2 = current2.getValue(); + current2 = current2.next(); - while (iterator2.hasNext()) { - eventSuperType2 = iterator2.next(); boolean type12Matches = type1Matches && eventSuperType2 == superType2; if (type12Matches) { continue; @@ -644,10 +745,13 @@ public class SubscriptionManager { leaf2 = leaf1.getLeaf(eventSuperType2); if (leaf2 != null) { - iterator3 = new SuperClassIterator(superType3, types3); + if (types3 != null) { + current3 = types3.head; + } + while (current3 != null) { + eventSuperType3 = current3.getValue(); + current3 = current3.next(); - while (iterator3.hasNext()) { - eventSuperType3 = iterator3.next(); if (type12Matches && eventSuperType3 == superType3) { continue; } @@ -679,4 +783,6 @@ public class SubscriptionManager { return subsPerType; } + + } diff --git a/src/main/java/dorkbox/util/messagebus/common/SuperClassIterator.java b/src/main/java/dorkbox/util/messagebus/common/SuperClassIterator.java deleted file mode 100644 index f9fd0ce..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/SuperClassIterator.java +++ /dev/null @@ -1,54 +0,0 @@ -package dorkbox.util.messagebus.common; - -import java.util.Collection; -import java.util.Iterator; - -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -public class SuperClassIterator implements Iterator> { - private final Iterator> iterator; - private Class clazz; - - public SuperClassIterator(Class clazz, Collection> types) { - this.clazz = clazz; - if (types != null) { - this.iterator = types.iterator(); - } else { - this.iterator = null; - } - } - - @Override - public boolean hasNext() { - if (this.clazz != null) { - return true; - } - - if (this.iterator != null) { - return this.iterator.hasNext(); - } - - return false; - } - - @Override - public Class next() { - if (this.clazz != null) { - Class clazz2 = this.clazz; - this.clazz = null; - - return clazz2; - } - - if (this.iterator != null) { - return this.iterator.next(); - } - - return null; - } - - @Override - public void remove() { - throw new NotImplementedException(); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 3d285a8..be61744 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -92,6 +92,10 @@ public class Subscription { return this.listeners.size(); } + public int count =0; + public int getCount() { + return count; + } /** * @return true if there were listeners for this publication, false if there was nothing */ @@ -109,43 +113,43 @@ public class Subscription { while (current != null) { listener = current.getValue(); current = current.next(); - - try { - invocation.invoke(listener, handler, handleIndex, message); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0]) - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } +count++; +// try { +// invocation.invoke(listener, handler, handleIndex, message); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + message.getClass() +// + "Expected: " + handler.getParameterTypes()[0]) +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } } return true; } diff --git a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java index 1ffce64..c032ddb 100644 --- a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java +++ b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java @@ -42,7 +42,7 @@ public class MultiMessageTest extends MessageBusTest { bus.publish(1, 2, "s"); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); - assertEquals(12, count.get()); + assertEquals(13, count.get()); count.set(0); @@ -52,7 +52,17 @@ public class MultiMessageTest extends MessageBusTest { bus.publishAsync(1, 2, "s"); bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6}); - assertEquals(12, count.get()); + while (bus.hasPendingMessages()) { + try { + Thread.sleep(ConcurrentUnits); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + // log.error(e); + } + } + + assertEquals(13, count.get()); bus.shutdown(); diff --git a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java index b9cd326..81fd205 100644 --- a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java @@ -20,8 +20,6 @@ public class PerformanceTest { public static final int CONCURRENCY_LEVEL = 2; - private static long count = 0; - protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { @Override public void handleError(PublicationError error) { @@ -42,15 +40,14 @@ public class PerformanceTest { ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - long num = 0; - while (num < Long.MAX_VALUE) { - bus.publishAsync(num++); + Long num = Long.valueOf(7L); + while (true) { + bus.publishAsync(num); } }}, CONCURRENCY_LEVEL); bus.shutdown(); - System.err.println("Count: " + count); } public PerformanceTest() { @@ -61,7 +58,6 @@ public class PerformanceTest { @Handler public void handleSync(Long o1) { // System.err.println(Long.toString(o1)); - count++; } } }