diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index df3fc75..d528d4f 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -6,7 +6,6 @@ import java.util.Iterator; import org.jctools.util.Pow2; -import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; @@ -208,6 +207,7 @@ public class MultiMBassador implements IMessageBus { Subscription sub; + // Run subscriptions if (subscriptions != null) { for (iterator = subscriptions.iterator(); iterator.hasNext();) { @@ -218,17 +218,17 @@ public class MultiMBassador implements IMessageBus { } } - if (!this.forceExactMatches) { - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); - // now get superClasses - if (superSubscriptions != null) { - for (iterator = superSubscriptions.iterator(); iterator.hasNext();) { - sub = iterator.next(); - - // this catches all exception types - sub.publishToSubscription(this, subsPublished, message); - } - } +// if (!this.forceExactMatches) { +// Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); +// // now get superClasses +// if (superSubscriptions != null) { +// for (iterator = superSubscriptions.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// +// // this catches all exception types +// sub.publishToSubscription(this, subsPublished, message); +// } +// } // // publish to var arg, only if not already an array @@ -261,21 +261,21 @@ public class MultiMBassador implements IMessageBus { // } // } // } - } - - if (!subsPublished.bool) { - // Dead Event must EXACTLY MATCH (no subclasses) - Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { - DeadMessage deadMessage = new DeadMessage(message); - - for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) { - sub = iterator.next(); - // this catches all exception types - sub.publishToSubscription(this, subsPublished, deadMessage); - } - } - } +// } +// +// if (!subsPublished.bool) { +// // Dead Event must EXACTLY MATCH (no subclasses) +// Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); +// if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { +// DeadMessage deadMessage = new DeadMessage(message); +// +// for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// // this catches all exception types +// sub.publishToSubscription(this, subsPublished, deadMessage); +// } +// } +// } } @Override diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 341fa20..5e22517 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -142,10 +142,12 @@ public class SubscriptionManager { this.nonListeners.put(listenerClass, Boolean.TRUE); return; } else { - VarArgPossibility varArgPossibility = this.varArgPossibility; - subsPerListener = new ConcurrentSet(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + + VarArgPossibility varArgPossibility = this.varArgPossibility; ConcurrentMap, Collection> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + HashMapTree, Collection> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + Iterator iterator; MessageHandler messageHandler; @@ -154,84 +156,9 @@ public class SubscriptionManager { for (iterator = messageHandlers.iterator(); iterator.hasNext();) { messageHandler = iterator.next(); - // now add this subscription to each of the handled types - Class[] types = messageHandler.getHandledMessages(); - int size = types.length; - - switch (size) { - case 1: { - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - subsPerType = subHolderConcurrent.get(); - - Collection putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType); - if (putIfAbsent != null) { - subsPerType = putIfAbsent; - } else { - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - boolean isArray = this.utils.isArray(types[0]); - // cache the super classes - this.utils.getSuperClasses(types[0], isArray); - if (isArray) { - varArgPossibility.set(true); - } - } - break; - } - case 2: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); - if (putIfAbsent != null) { - subsPerType = putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - // cache the super classes - this.utils.getSuperClasses(types[0]); - this.utils.getSuperClasses(types[1]); - } - break; - } - case 3: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); - if (putIfAbsent != null) { - subsPerType = putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - // cache the super classes - this.utils.getSuperClasses(types[0]); - this.utils.getSuperClasses(types[1]); - this.utils.getSuperClasses(types[2]); - } - break; - } - default: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); - if (putIfAbsent != null) { - subsPerType = putIfAbsent; - } else { - subHolderSingle.set(subHolderSingle.initialValue()); - - Class c; - int length = types.length; - for (int i = 0; i < length; i++) { - c = types[i]; - this.utils.getSuperClasses(c); - } - } - break; - } - } + // this can safely be called concurrently + subsPerType = getSubsPerType(messageHandler, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); // create the subscription Subscription subscription = new Subscription(messageHandler); @@ -255,6 +182,101 @@ public class SubscriptionManager { } } + private final Collection getSubsPerType(MessageHandler messageHandler, + ConcurrentMap, Collection> subsPerMessageSingle, + HashMapTree, Collection> subsPerMessageMulti, + VarArgPossibility varArgPossibility) { + + Class[] types = messageHandler.getHandledMessages(); + int size = types.length; + + ConcurrentSet subsPerType; + + SubscriptionUtils utils = this.utils; + switch (size) { + case 1: { + SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; + subsPerType = subHolderConcurrent.get(); + + Collection putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType); + if (putIfAbsent != null) { + return putIfAbsent; + } else { + subHolderConcurrent.set(subHolderConcurrent.initialValue()); + boolean isArray = utils.isArray(types[0]); + if (isArray) { + varArgPossibility.set(true); + } + + // cache the super classes + utils.getSuperClasses(types[0], isArray); + + return subsPerType; + } + } + case 2: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + subsPerType = subHolderSingle.get(); + + Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); + if (putIfAbsent != null) { + return putIfAbsent; + } else { + subHolderSingle.set(subHolderSingle.initialValue()); + + // cache the super classes + utils.getSuperClasses(types[0]); + utils.getSuperClasses(types[1]); + + return subsPerType; + } + } + case 3: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + subsPerType = subHolderSingle.get(); + + Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); + if (putIfAbsent != null) { + return putIfAbsent; + } else { + subHolderSingle.set(subHolderSingle.initialValue()); + + // cache the super classes + utils.getSuperClasses(types[0]); + utils.getSuperClasses(types[1]); + utils.getSuperClasses(types[2]); + + return subsPerType; + } + } + default: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + subsPerType = subHolderSingle.get(); + + Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); + if (putIfAbsent != null) { + return putIfAbsent; + } else { + subHolderSingle.set(subHolderSingle.initialValue()); + + Class c; + int length = types.length; + for (int i = 0; i < length; i++) { + c = types[i]; + + // cache the super classes + utils.getSuperClasses(c); + } + + return subsPerType; + } + } + } + } + public final void unsubscribe(Object listener) { if (listener == null) { return; @@ -277,6 +299,7 @@ public class SubscriptionManager { for (iterator = subscriptions.iterator(); iterator.hasNext();) { sub = iterator.next(); + sub.unsubscribe(listener); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java index 1ec5329..cbc7b88 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java @@ -30,7 +30,7 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { public ConcurrentSet(int size, float loadFactor, int stripeSize) { super(); - this.entries = new ConcurrentHashMapV8>(size, loadFactor, stripeSize); + this.entries = new ConcurrentHashMapV8>(size, loadFactor, 32); } @Override @@ -39,6 +39,7 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { return false; } + // had to modify the super implementation so we get Node back Node alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER); if (alreadyPresent == null) { // this doesn't already exist @@ -76,7 +77,7 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { @Override public boolean isEmpty() { - return this.entries.isEmpty(); + return super.isEmpty(); } /** @@ -219,12 +220,12 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { @Override public Object[] toArray() { - return this.entries.entrySet().toArray(); + return this.entries.keySet().toArray(); } @Override public T[] toArray(T[] a) { - return this.entries.entrySet().toArray(a); + return this.entries.keySet().toArray(a); } @Override diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 07afd32..0cf815c 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -1,6 +1,5 @@ package dorkbox.util.messagebus.subscription; -import java.lang.reflect.InvocationTargetException; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; @@ -12,7 +11,6 @@ import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.listener.MessageHandler; /** @@ -93,6 +91,11 @@ public class Subscription { return this.listeners.size(); } + int c = 0; + public int c() { + return this.c; + } + /** * @return true if there were listeners for this publication, false if there was nothing */ @@ -110,42 +113,36 @@ public class Subscription { for (iterator = listeners.iterator(); iterator.hasNext();) { listener = iterator.next(); - try { - invocation.invoke(listener, handler, handleIndex, message); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0]) - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } + this.c++; + +// try { +// invocation.invoke(listener, handler, handleIndex, message); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + message.getClass() +// + "Expected: " + handler.getParameterTypes()[0]) +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The Message handler implementation threw an exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } } booleanHolder.bool = true; } @@ -192,18 +189,10 @@ public class Subscription { // .setMethodName(handler.getMethodNames()[handleIndex]) // .setListener(listener) // .setPublishedObject(message1, message2)); -// } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2)); // } catch (Throwable e) { // errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") +// "The Message handler code threw an exception") // .setCause(e) // .setMethodName(handler.getMethodNames()[handleIndex]) // .setListener(listener) @@ -257,18 +246,10 @@ public class Subscription { // .setMethodName(handler.getMethodNames()[handleIndex]) // .setListener(listener) // .setPublishedObject(message1, message2, message3)); -// } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); // } catch (Throwable e) { // errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") +// "The Message handler code threw an exception") // .setCause(e) // .setMethodName(handler.getMethodNames()[handleIndex]) // .setListener(listener) diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java index a9610b5..740ea64 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedTransferQueue; @@ -51,17 +52,16 @@ public class PerfTest_Collections { // } System.err.println("Done"); - bench(size, new ConcurrentLinkedQueue2()); - bench(size, new ConcurrentSet(size*2, LOAD_FACTOR, 5)); - bench(size, Collections.newSetFromMap(new ConcurrentHashMapV8(size*2, LOAD_FACTOR, 1))); bench(size, new ArrayList(size*2)); + bench(size, new ConcurrentSet(size*2, LOAD_FACTOR, 5)); + bench(size, new ConcurrentLinkedQueue2()); bench(size, new ConcurrentLinkedQueue()); bench(size, new LinkedTransferQueue()); bench(size, new ArrayDeque(size*2)); - bench(size, new ConcurrentLinkedQueue()); bench(size, new LinkedList()); bench(size, new StrongConcurrentSetV8(size*2, LOAD_FACTOR)); bench(size, new StrongConcurrentSet(size*2, LOAD_FACTOR)); + bench(size, Collections.newSetFromMap(new ConcurrentHashMapV8(size*2, LOAD_FACTOR, 1))); bench(size, new HashSet()); // bench(size, new ConcurrentSkipListSet()); // needs comparable } @@ -81,9 +81,13 @@ public class PerfTest_Collections { } } - for (int i=2;i<6;i++) { - long average = averageRun(warmupRuns, runs, set, false, i, REPETITIONS); - if (showOutput) { + if (!showOutput) { + for (int i=2;i<6;i++) { + averageRun(warmupRuns, runs, set, false, i, REPETITIONS); + } + } else { + for (int i=1;i<10;i++) { + long average = averageRun(warmupRuns, runs, set, false, i, REPETITIONS); System.out.format("summary,IteratorPerfTest,%s - %,d (%d)\n", set.getClass().getSimpleName(), average, i); } } @@ -174,12 +178,16 @@ public class PerfTest_Collections { int i = this.repetitions; this.start = System.nanoTime(); + Iterator iterator; + Subscription sub; + // Entry current; // Subscription sub; int count = 0; do { - for (Subscription sub : set) { + for (iterator = set.iterator(); iterator.hasNext();) { + sub = iterator.next(); // if (sub.acceptsSubtypes()) { // count--; // } else {