From 56728b327dd61e72372674f74962c4dda168f99c Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 7 Jun 2015 00:55:32 +0200 Subject: [PATCH] WIP getting 2/3 super vararg implemented --- .../dorkbox/util/messagebus/IMessageBus.java | 2 +- .../dorkbox/util/messagebus/MatcherAll.java | 27 ++ .../dorkbox/util/messagebus/MatcherExact.java | 27 ++ .../MatcherExactWithSuperTypes.java | 27 ++ .../util/messagebus/MultiMBassador.java | 94 +---- .../util/messagebus/common/VarArgUtils.java | 329 ++++++++---------- .../util/messagebus/subscription/Matcher.java | 8 +- .../subscription/SubscriptionManager.java | 165 ++++++++- .../util/messagebus/MultiMessageTest.java | 22 +- 9 files changed, 430 insertions(+), 271 deletions(-) create mode 100644 src/main/java/dorkbox/util/messagebus/MatcherAll.java create mode 100644 src/main/java/dorkbox/util/messagebus/MatcherExact.java create mode 100644 src/main/java/dorkbox/util/messagebus/MatcherExactWithSuperTypes.java diff --git a/src/main/java/dorkbox/util/messagebus/IMessageBus.java b/src/main/java/dorkbox/util/messagebus/IMessageBus.java index 8557729..5888e4c 100644 --- a/src/main/java/dorkbox/util/messagebus/IMessageBus.java +++ b/src/main/java/dorkbox/util/messagebus/IMessageBus.java @@ -60,7 +60,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport { - enum Mode { + enum PublishMode { /** * Will only publish to listeners with this exact message signature. This is the fastest */ diff --git a/src/main/java/dorkbox/util/messagebus/MatcherAll.java b/src/main/java/dorkbox/util/messagebus/MatcherAll.java new file mode 100644 index 0000000..094be4b --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/MatcherAll.java @@ -0,0 +1,27 @@ +package dorkbox.util.messagebus; + +import dorkbox.util.messagebus.subscription.Matcher; +import dorkbox.util.messagebus.subscription.SubscriptionManager; + +public class MatcherAll implements Matcher { + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable { + subscriptionManager.publishAll(message1); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable { + subscriptionManager.publishAll(message1, message2); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3) + throws Throwable { + subscriptionManager.publishAll(message1, message2, message3); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable { + subscriptionManager.publishAll(messages); + } +} diff --git a/src/main/java/dorkbox/util/messagebus/MatcherExact.java b/src/main/java/dorkbox/util/messagebus/MatcherExact.java new file mode 100644 index 0000000..e7857fa --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/MatcherExact.java @@ -0,0 +1,27 @@ +package dorkbox.util.messagebus; + +import dorkbox.util.messagebus.subscription.Matcher; +import dorkbox.util.messagebus.subscription.SubscriptionManager; + +public class MatcherExact implements Matcher { + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable { + subscriptionManager.publishExact(message1); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable { + subscriptionManager.publishExact(message1, message2); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3) + throws Throwable { + subscriptionManager.publishExact(message1, message2, message3); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable { + subscriptionManager.publishExact(messages); + } +} diff --git a/src/main/java/dorkbox/util/messagebus/MatcherExactWithSuperTypes.java b/src/main/java/dorkbox/util/messagebus/MatcherExactWithSuperTypes.java new file mode 100644 index 0000000..bc27997 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/MatcherExactWithSuperTypes.java @@ -0,0 +1,27 @@ +package dorkbox.util.messagebus; + +import dorkbox.util.messagebus.subscription.Matcher; +import dorkbox.util.messagebus.subscription.SubscriptionManager; + +public class MatcherExactWithSuperTypes implements Matcher { + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable { + subscriptionManager.publishExactAndSuper(message1); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable { + subscriptionManager.publishExactAndSuper(message1, message2); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3) + throws Throwable { + subscriptionManager.publishExactAndSuper(message1, message2, message3); + } + + @Override + public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable { + subscriptionManager.publishExactAndSuper(messages); + } +} diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 8286719..5999df4 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -51,14 +51,14 @@ public class MultiMBassador implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MultiMBassador(int numberOfThreads) { - this(Mode.ExactWithSuperTypes, numberOfThreads); + this(PublishMode.ExactWithSuperTypesAndVarArgs, numberOfThreads); } /** - * @param mode Specifies which mode to operate the publication of messages. + * @param publishMode Specifies which publishMode to operate the publication of messages. * @param numberOfThreads how many threads to have for dispatching async messages */ - public MultiMBassador(Mode mode, int numberOfThreads) { + public MultiMBassador(final PublishMode publishMode, int numberOfThreads) { if (numberOfThreads < 2) { numberOfThreads = 2; // at LEAST 2 threads } @@ -66,80 +66,16 @@ public class MultiMBassador implements IMessageBus { this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads); - switch (mode) { + switch (publishMode) { case Exact: - subscriptionMatcher = new Matcher() { - @Override - public void publish(final Object message1) throws Throwable { - subscriptionManager.publishExact(message1); - } - - @Override - public void publish(final Object message1, final Object message2) throws Throwable { - subscriptionManager.publishExact(message1, message2); - } - - @Override - public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - subscriptionManager.publishExact(message1, message2, message3); - } - - @Override - public void publish(final Object[] messages) throws Throwable { - subscriptionManager.publishExact(messages); - } - - }; + subscriptionMatcher = new MatcherExact(); break; case ExactWithSuperTypes: - subscriptionMatcher = new Matcher() { - @Override - public void publish(final Object message1) throws Throwable { - subscriptionManager.publishExactAndSuper(message1); - } - - @Override - public void publish(final Object message1, final Object message2) throws Throwable { - subscriptionManager.publishExactAndSuper(message1, message2); - } - - @Override - public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - subscriptionManager.publishExactAndSuper(message1, message2, message3); - } - - @Override - public void publish(final Object[] messages) throws Throwable { - subscriptionManager.publishExactAndSuper(messages); - } - }; + subscriptionMatcher = new MatcherExactWithSuperTypes(); break; case ExactWithSuperTypesAndVarArgs: default: - subscriptionMatcher = new Matcher() { - @Override - public void publish(final Object message1) throws Throwable { - subscriptionManager.publishAll(message1); - } - - @Override - public void publish(final Object message1, final Object message2) throws Throwable { - // we don't support var-args for multiple messages (var-args can only be a single type) - subscriptionManager.publishExactAndSuper(message1, message2); - } - - @Override - public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - // we don't support var-args for multiple messages (var-args can only be a single type) - subscriptionManager.publishExactAndSuper(message1, message2, message3); - } - - @Override - public void publish(final Object[] messages) throws Throwable { - // we don't support var-args for multiple messages (var-args can only be a single type) - subscriptionManager.publishExactAndSuper(messages); - } - }; + subscriptionMatcher = new MatcherAll(); } this.threads = new ArrayDeque(numberOfThreads); @@ -191,11 +127,17 @@ public class MultiMBassador implements IMessageBus { .setCause(e).setPublishedObject(node.item1, node.item2)); break; } - default: { + case 3: { handlePublicationError( new PublicationError().setMessage("Thread interrupted while processing message") .setCause(e) .setPublishedObject(node.item1, node.item2, node.item3)); + break; + } + default: { + handlePublicationError( + new PublicationError().setMessage("Thread interrupted while processing message") + .setCause(e).setPublishedObject(node.item1)); } } } @@ -265,7 +207,7 @@ public class MultiMBassador implements IMessageBus { @Override public void publish(final Object message) { try { - subscriptionMatcher.publish(message); + subscriptionMatcher.publish(subscriptionManager, message); } catch (Throwable e) { handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) .setPublishedObject(message)); @@ -275,7 +217,7 @@ public class MultiMBassador implements IMessageBus { @Override public void publish(final Object message1, final Object message2) { try { - subscriptionMatcher.publish(message1, message2); + subscriptionMatcher.publish(subscriptionManager, message1, message2); } catch (Throwable e) { handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) .setPublishedObject(message1, message2)); @@ -285,7 +227,7 @@ public class MultiMBassador implements IMessageBus { @Override public void publish(final Object message1, final Object message2, final Object message3) { try { - subscriptionMatcher.publish(message1, message2, message3); + subscriptionMatcher.publish(subscriptionManager, message1, message2, message3); } catch (Throwable e) { handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) .setPublishedObject(message1, message2, message3)); @@ -295,7 +237,7 @@ public class MultiMBassador implements IMessageBus { @Override public void publish(final Object[] messages) { try { - subscriptionMatcher.publish(messages); + subscriptionMatcher.publish(subscriptionManager, messages); } catch (Throwable e) { handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) .setPublishedObject(messages)); diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index b5eeb45..7d5a042 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -1,18 +1,16 @@ package dorkbox.util.messagebus.common; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.SubscriptionUtils; import java.util.ArrayList; -import java.util.List; import java.util.Map; -public class VarArgUtils { +public final class VarArgUtils { private final Map, ArrayList> varArgSubscriptions; - private final Map, List> varArgSuperClassSubscriptions; - private final HashMapTree, ConcurrentSet> varArgSuperClassSubscriptionsMulti; + private final Map, ArrayList> varArgSuperClassSubscriptions; + private final HashMapTree, ArrayList> varArgSuperClassSubscriptionsMulti; private final SubscriptionHolder subHolderConcurrent; @@ -25,8 +23,7 @@ public class VarArgUtils { public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils, - Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, - int stripeSize) { + Map, ArrayList> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) { this.utils = utils; this.superClassUtils = superClassUtils; @@ -35,8 +32,8 @@ public class VarArgUtils { this.stripeSize = stripeSize; this.varArgSubscriptions = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); - this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, List>(16, loadFactor, stripeSize); - this.varArgSuperClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, loadFactor); + this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, ArrayList>(16, loadFactor, stripeSize); + this.varArgSuperClassSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); this.subHolderConcurrent = new SubscriptionHolder(); } @@ -53,7 +50,8 @@ public class VarArgUtils { // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions public Subscription[] getVarArgSubscriptions(Class messageClass) { - Map, ArrayList> local = this.varArgSubscriptions; + // whenever our subscriptions change, this map is cleared. + final Map, ArrayList> local = this.varArgSubscriptions; ArrayList varArgSubs = local.get(messageClass); @@ -63,7 +61,7 @@ public class VarArgUtils { ArrayList subs = this.subscriptionsPerMessageSingle.get(arrayVersion); if (subs != null) { - int length = subs.size(); + final int length = subs.size(); varArgSubs = new ArrayList(length); Subscription sub; @@ -79,194 +77,163 @@ public class VarArgUtils { } } -// return varArgSubs; - - return null; + final Subscription[] subscriptions = new Subscription[varArgSubs.size()]; + varArgSubs.toArray(subscriptions); + return subscriptions; + } + + // CAN NOT RETURN NULL + // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI + // and then, returns the array'd version superclass subscriptions + public Subscription[] getVarArgSuperSubscriptions(final Class messageClass) { + final ArrayList subs = getVarArgSuperSubscriptions_List(messageClass); + + final Subscription[] subscriptions = new Subscription[subs.size()]; + subs.toArray(subscriptions); + return subscriptions; + } + + private ArrayList getVarArgSuperSubscriptions_List(final Class messageClass) { // whenever our subscriptions change, this map is cleared. -// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// ConcurrentSet subsPerType = subHolderConcurrent.publish(); -// -// // cache our subscriptions for super classes, so that their access can be fast! -// ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); -// if (putIfAbsent == null) { -// // we are the first one in the map -// subHolderConcurrent.set(subHolderConcurrent.initialValue()); -// -// -// Iterator iterator; -// Subscription sub; -// -// Collection subs = this.subscriptionsPerMessageSingle.publish(arrayVersion); -// if (subs != null) { -// for (iterator = subs.iterator(); iterator.hasNext();) { -// sub = iterator.next(); -// if (sub.acceptsVarArgs()) { -// subsPerType.add(sub); -// } -// } -// } -// return subsPerType; -// } else { -// // someone beat us -// return putIfAbsent; -// } + final Map, ArrayList> local = this.varArgSuperClassSubscriptions; -// return null; - } + ArrayList varArgSuperSubs = local.get(messageClass); + + if (varArgSuperSubs == null) { + // this gets (and caches) our array type. This is never cleared. + final Class arrayVersion = this.superClassUtils.getArrayClass(messageClass); + final Class[] types = this.superClassUtils.getSuperClasses(arrayVersion); + + final int typesLength = types.length; + varArgSuperSubs = new ArrayList(typesLength); + + if (typesLength == 0) { + local.put(messageClass, varArgSuperSubs); + return varArgSuperSubs; + } + Class type; + Subscription sub; + ArrayList subs; + int length; - // CAN RETURN NULL - // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI - // and then, returns the array'd version subscriptions - public Subscription[] getVarArgSuperSubscriptions(Class messageClass) { -// // whenever our subscriptions change, this map is cleared. -// ConcurrentMap, ConcurrentSet> local = this.varArgSuperClassSubscriptions; -// -// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// ConcurrentSet subsPerType = subHolderConcurrent.publish(); -// -// // cache our subscriptions for super classes, so that their access can be fast! -// ConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); -// -// if (putIfAbsent == null) { -// // we are the first one in the map -// subHolderConcurrent.set(subHolderConcurrent.initialValue()); -// -// Class arrayVersion = this.utils.getArrayClass(messageClass); -// Collection> types = this.utils.getSuperClasses(arrayVersion, true); -// if (types.isEmpty()) { -// return subsPerType; -// } -// -// Map, Collection> local2 = this.subscriptionsPerMessageSingle; -// -// Iterator> iterator; -// Class superClass; -// -// Iterator subIterator; -// Subscription sub; -// -// -// for (iterator = types.iterator(); iterator.hasNext();) { -// superClass = iterator.next(); -// -// Collection subs = local2.publish(superClass); -// if (subs != null) { -// for (subIterator = subs.iterator(); subIterator.hasNext();) { -// sub = subIterator.next(); -// if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { -// subsPerType.add(sub); -// } -// } -// } -// } -// return subsPerType; -// } else { -// // someone beat us -// return putIfAbsent; -// } + for (int i = 0; i < typesLength; i++) { + type = types[i]; + subs = this.subscriptionsPerMessageSingle.get(type); - return null; + if (subs != null) { + length = subs.size(); + varArgSuperSubs = new ArrayList(length); + + for (int j = 0; j < length; j++) { + sub = subs.get(j); + + if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { + varArgSuperSubs.add(sub); + } + } + + } + } + + local.put(messageClass, varArgSuperSubs); + } + + return varArgSuperSubs; } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI - // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { -// HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; -// -// // whenever our subscriptions change, this map is cleared. -// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); -// ConcurrentSet subsPerType = null; -// -// // we DO NOT care about duplicate, because the answers will be the same -// if (subsPerTypeLeaf != null) { -// // if the leaf exists, then the value exists. -// subsPerType = subsPerTypeLeaf.getValue(); -// } else { -// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// subsPerType = subHolderConcurrent.publish(); -// -// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); -// if (putIfAbsent != null) { -// // someone beat us -// subsPerType = putIfAbsent; -// } else { -// // the message class types are not the same, so look for a common superClass varArg subscription. -// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages -// ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); -// ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); -// -// Iterator iterator; -// Subscription sub; -// -// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { -// sub = iterator.next(); -// if (varargSuperSubscriptions2.contains(sub)) { -// subsPerType.add(sub); -// } -// } -// -// subHolderConcurrent.set(subHolderConcurrent.initialValue()); -// } -// } -// -// return subsPerType; - return null; + // and then, returns the array'd version superclass subscriptions + public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2) { + // whenever our subscriptions change, this map is cleared. + final HashMapTree, ArrayList> local = this.varArgSuperClassSubscriptionsMulti; + + HashMapTree, ArrayList> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); + ArrayList subsPerType; + + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } + else { + // the message class types are not the same, so look for a common superClass varArg subscription. + // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); + + final int size1 = varargSuperSubscriptions1.size(); + final int size2 = varargSuperSubscriptions2.size(); + + subsPerType = new ArrayList(size1 + size2); + + Subscription sub; + for (int i = 0; i < size1; i++) { + sub = varargSuperSubscriptions1.get(i); + + if (varargSuperSubscriptions2.contains(sub)) { + subsPerType.add(sub); + } + } + + subsPerType.trimToSize(); + + local.put(subsPerType, messageClass1, messageClass2); + } + + final Subscription[] subscriptions = new Subscription[subsPerType.size()]; + subsPerType.toArray(subscriptions); + return subscriptions; } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI - // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, - final Class messageClass3) { -// HashMapTree, ConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; -// -// // whenever our subscriptions change, this map is cleared. -// HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); -// ConcurrentSet subsPerType = null; -// -// // we DO NOT care about duplicate, because the answers will be the same -// if (subsPerTypeLeaf != null) { -// // if the leaf exists, then the value exists. -// subsPerType = subsPerTypeLeaf.getValue(); -// } else { -// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; -// subsPerType = subHolderConcurrent.publish(); -// -// ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); -// if (putIfAbsent != null) { -// // someone beat us -// subsPerType = putIfAbsent; -// } else { -// // the message class types are not the same, so look for a common superClass varArg subscription. -// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages -// ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); -// ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); -// ConcurrentSet varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); -// -// Iterator iterator; -// Subscription sub; -// -// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { -// sub = iterator.next(); -// if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { -// subsPerType.add(sub); -// } -// } -// -// subHolderConcurrent.set(subHolderConcurrent.initialValue()); -// } -// } -// -// return subsPerType; + // and then, returns the array'd version superclass subscriptions + public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, + final Class messageClass3) { + // whenever our subscriptions change, this map is cleared. + final HashMapTree, ArrayList> local = this.varArgSuperClassSubscriptionsMulti; - return null; + HashMapTree, ArrayList> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); + ArrayList subsPerType; + + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } + else { + // the message class types are not the same, so look for a common superClass varArg subscription. + // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); + final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3); + + final int size1 = varargSuperSubscriptions1.size(); + final int size2 = varargSuperSubscriptions2.size(); + + subsPerType = new ArrayList(size1 + size2); + + Subscription sub; + for (int i = 0; i < size1; i++) { + sub = varargSuperSubscriptions1.get(i); + + if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { + subsPerType.add(sub); + } + } + + subsPerType.trimToSize(); + + local.put(subsPerType, messageClass1, messageClass2, messageClass3); + } + + final Subscription[] subscriptions = new Subscription[subsPerType.size()]; + subsPerType.toArray(subscriptions); + return subscriptions; } - } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java index 6b1fc5c..54185fa 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java @@ -1,11 +1,11 @@ package dorkbox.util.messagebus.subscription; public interface Matcher { - void publish(Object message) throws Throwable; + void publish(SubscriptionManager subscriptionManager, Object message1) throws Throwable; - void publish(Object message1, Object message2) throws Throwable; + void publish(SubscriptionManager subscriptionManager, Object message1, Object message2) throws Throwable; - void publish(Object message1, Object message2, Object message3) throws Throwable; + void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3) throws Throwable; - void publish(Object[] messages) throws Throwable; + void publish(SubscriptionManager subscriptionManager, Object[] messages) throws Throwable; } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index ec9c9e3..3b69f32 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -336,7 +336,7 @@ public final class SubscriptionManager { } } else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; + collection = superSubscriptions; } if (collection != null) { @@ -622,7 +622,7 @@ public final class SubscriptionManager { for (int i = 0; i < length; i++) { sub = varArgSubscriptions[i]; - sub.publish(message); + sub.publish(asArray); } stamp = lock.readLock(); @@ -672,6 +672,167 @@ public final class SubscriptionManager { } } + public void publishAll(final Object message1, final Object message2) throws Throwable { + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + + final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2); // can return null + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, message2); + } + + // publish to var arg, only if not already an array AND we are all of the same type + if (varArgPossibility.get() && messageClass1 == messageClass2 && !messageClass1.isArray()) { + long stamp = lock.readLock(); + final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null + lock.unlockRead(stamp); + + if (varArgSubscriptions != null) { + final int length = varArgSubscriptions.length; + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); + asArray[0] = message1; + asArray[1] = message2; + + + for (int i = 0; i < length; i++) { + sub = varArgSubscriptions[i]; + sub.publish(asArray); + } + + stamp = lock.readLock(); + // now publish array based superClasses (but only if those ALSO accept vararg) + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); + lock.unlockRead(stamp); + + if (varArgSuperSubscriptions != null) { + for (int i = 0; i < length; i++) { + sub = varArgSuperSubscriptions[i]; + sub.publish(asArray); + } + } + } + else { + stamp = lock.readLock(); + + // now publish array based superClasses (but only if those ALSO accept vararg) + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); + lock.unlockRead(stamp); + + if (varArgSuperSubscriptions != null) { + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); + asArray[0] = message1; + asArray[1] = message2; + + for (int i = 0; i < varArgSuperSubscriptions.length; i++) { + sub = varArgSuperSubscriptions[i]; + sub.publish(asArray); + } + } + } + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } + + public void publishAll(final Object message1, final Object message2, final Object message3) throws Throwable { + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + final Class messageClass3 = message3.getClass(); + + final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, message2, message3); + } + + // publish to var arg, only if not already an array AND we are all of the same type + if (varArgPossibility.get() && messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) { + long stamp = lock.readLock(); + final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null + lock.unlockRead(stamp); + + if (varArgSubscriptions != null) { + final int length = varArgSubscriptions.length; + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3); + asArray[0] = message1; + asArray[1] = message2; + asArray[2] = message3; + + + for (int i = 0; i < length; i++) { + sub = varArgSubscriptions[i]; + sub.publish(asArray); + } + + stamp = lock.readLock(); + // now publish array based superClasses (but only if those ALSO accept vararg) + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); + lock.unlockRead(stamp); + + if (varArgSuperSubscriptions != null) { + for (int i = 0; i < length; i++) { + sub = varArgSuperSubscriptions[i]; + sub.publish(asArray); + } + } + } + else { + stamp = lock.readLock(); + + // now publish array based superClasses (but only if those ALSO accept vararg) + final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1); + lock.unlockRead(stamp); + + if (varArgSuperSubscriptions != null) { + Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2); + asArray[0] = message1; + asArray[1] = message2; + asArray[2] = message3; + + for (int i = 0; i < varArgSuperSubscriptions.length; i++) { + sub = varArgSuperSubscriptions[i]; + sub.publish(asArray); + } + } + } + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } + // public static final Comparator SubscriptionByPriorityDesc = new Comparator() { // @Override diff --git a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java index c032ddb..9833bb1 100644 --- a/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java +++ b/src/test/java/dorkbox/util/messagebus/MultiMessageTest.java @@ -3,12 +3,11 @@ */ package dorkbox.util.messagebus; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.MessageBusTest; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; /** * @author dorkbox, llc @@ -29,6 +28,7 @@ public class MultiMessageTest extends MessageBusTest { bus.publish("s"); bus.publish("s", "s"); bus.publish("s", "s", "s"); + bus.publish(1, "s"); bus.publish(1, 2, "s"); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); @@ -36,9 +36,10 @@ public class MultiMessageTest extends MessageBusTest { bus.subscribe(listener1); - bus.publish("s"); - bus.publish("s", "s"); - bus.publish("s", "s", "s"); + bus.publish("s"); // 4 + bus.publish("s", "s"); // 3 + bus.publish("s", "s", "s"); // 3 + bus.publish(1, "s"); bus.publish(1, 2, "s"); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); @@ -49,6 +50,7 @@ public class MultiMessageTest extends MessageBusTest { bus.publishAsync("s"); bus.publishAsync("s", "s"); bus.publishAsync("s", "s", "s"); + bus.publish(1, "s"); bus.publishAsync(1, 2, "s"); bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6}); @@ -70,6 +72,12 @@ public class MultiMessageTest extends MessageBusTest { @SuppressWarnings("unused") public static class Listener { + @Handler + public void handleSync(Object o) { + count.getAndIncrement(); + System.err.println("match Object"); + } + @Handler public void handleSync(String o1) { count.getAndIncrement();