diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 6e0165a..ac5ca81 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -118,7 +118,7 @@ class MessageBus implements IMessageBus { /** * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) */ - this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler); + this.subscriptionManager = new SubscriptionManager(numberOfThreads); switch (publishMode) { case Exact: diff --git a/src/dorkbox/util/messagebus/common/ClassTree.java b/src/dorkbox/util/messagebus/common/ClassTree.java index d7204cb..966e7f5 100644 --- a/src/dorkbox/util/messagebus/common/ClassTree.java +++ b/src/dorkbox/util/messagebus/common/ClassTree.java @@ -35,9 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; * Date: 2/2/15 */ public class ClassTree { - - - public static int INITIAL_SIZE = 4; public static float LOAD_FACTOR = 0.8F; diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java index aae25b5..1635f25 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java @@ -44,12 +44,13 @@ class PublisherExact implements Publisher { // Run subscriptions if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication synchrony.publish(subscriptions, message1); } else { // Dead Event must EXACTLY MATCH (no subclasses) final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { synchrony.publish(deadSubscriptions, new DeadMessage(message1)); } @@ -72,12 +73,13 @@ class PublisherExact implements Publisher { // Run subscriptions if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication synchrony.publish(subscriptions, message1, message2); } else { // Dead Event must EXACTLY MATCH (no subclasses) final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2)); } @@ -101,12 +103,13 @@ class PublisherExact implements Publisher { // Run subscriptions if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication synchrony.publish(subscriptions, message1, message2, message3); } else { // Dead Event must EXACTLY MATCH (no subclasses) final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3)); } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java index e8a383b..da7e8f7 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java @@ -47,6 +47,8 @@ class PublisherExactWithSuperTypes implements Publisher { // Run subscriptions final Subscription[] subscriptions = subManager.getSubs(message1Class); // can return null if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(subscriptions, message1); } @@ -54,6 +56,8 @@ class PublisherExactWithSuperTypes implements Publisher { // Run superSubscriptions final Subscription[] superSubscriptions = subManager.getSuperSubs(message1Class); // NOT return null if (superSubscriptions.length > 0) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(superSubscriptions, message1); } @@ -86,14 +90,18 @@ class PublisherExactWithSuperTypes implements Publisher { // Run subscriptions final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(subscriptions, message1, message2); } // Run superSubscriptions - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // can return null - if (superSubscriptions != null) { + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null + if (superSubscriptions.length > 0) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(superSubscriptions, message1, message2); } @@ -127,14 +135,18 @@ class PublisherExactWithSuperTypes implements Publisher { // Run subscriptions final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null if (subscriptions != null) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(subscriptions, message1, message2, message3); } // Run superSubscriptions - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // can return null - if (superSubscriptions != null) { + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null + if (superSubscriptions.length > 0) { + // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async + // nature of publication hasSubs = true; synchrony.publish(superSubscriptions, message1, message2, message3); } diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index 742028d..7d81d52 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -147,19 +147,19 @@ class Subscription { * @return true if messages were published */ public abstract - boolean publish(final Object message) throws Throwable; + void publish(final Object message) throws Throwable; /** * @return true if messages were published */ public abstract - boolean publish(final Object message1, final Object message2) throws Throwable; + void publish(final Object message1, final Object message2) throws Throwable; /** * @return true if messages were published */ public abstract - boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable; + void publish(final Object message1, final Object message2, final Object message3) throws Throwable; @Override diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 9cb8dcd..96700a6 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -20,11 +20,9 @@ import dorkbox.util.messagebus.MessageBus; import dorkbox.util.messagebus.common.ClassTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.MultiClass; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.subscription.asm.SubMakerAsm; import dorkbox.util.messagebus.subscription.reflection.SubMakerReflection; import dorkbox.util.messagebus.utils.ClassUtils; -import dorkbox.util.messagebus.utils.SubscriptionUtils; import java.util.ArrayList; import java.util.Arrays; @@ -41,12 +39,11 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; * @author dorkbox, llc * Date: 2/2/15 */ -@SuppressWarnings("unchecked") +@SuppressWarnings({"unchecked", "ToArrayCallWithZeroLengthArrayArgument"}) public final class SubscriptionManager { public static final float LOAD_FACTOR = 0.8F; - - // TODO: during startup, pre-calculate the number of subscription listeners and x2 to save as subsPerListener expected max size + public static final Subscription[] EMPTY_SUBS = new Subscription[0]; // controls if we use java reflection or ASM to access methods during publication private final SubMaker subMaker; @@ -70,6 +67,7 @@ class SubscriptionManager { // keeps track of all subscriptions of the super classes of a message type. private volatile IdentityMap, Subscription[]> subsSuperSingle; + private volatile IdentityMap subsSuperMulti; // In order to force the "Single writer principle" on subscribe & unsubscribe, they are within WRITE LOCKS. They could be dispatched // to another thread, however we do NOT want them asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is @@ -78,11 +76,6 @@ class SubscriptionManager { - - private final ErrorHandlingSupport errorHandler; - - private final SubscriptionUtils subUtils; - private final ClassTree> classTree; private final ClassUtils classUtils; @@ -105,14 +98,13 @@ class SubscriptionManager { IdentityMap.class, "subsSuperSingle"); -//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to -// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the -// correct type too) + private static final AtomicReferenceFieldUpdater subsSuperMultiREF = + AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, + IdentityMap.class, + "subsSuperSingle"); public - SubscriptionManager(final int numberOfThreads, final ErrorHandlingSupport errorHandler) { - this.errorHandler = errorHandler; - + SubscriptionManager(final int numberOfThreads) { if (MessageBus.useAsmForDispatch) { this.subMaker = new SubMakerAsm(); } @@ -130,12 +122,15 @@ class SubscriptionManager { subsSuperSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR); + subsSuperMulti = new IdentityMap(32, LOAD_FACTOR); classTree = new ClassTree>(); - subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR, numberOfThreads); } + /** + * Shuts down and clears all memory usage by the subscriptions + */ public void shutdown() { @@ -164,6 +159,15 @@ class SubscriptionManager { this.classUtils.shutdown(); } + /** + * Subscribes a specific listener. The infrastructure for subscription never "shrinks", meaning that when a listener is un-subscribed, + * the listeners are only removed from the internal map -- the map itself is not cleaned up until a 'shutdown' is called. + * + * This method uses the "single-writer-principle" for lock-free publication. Since there are only 2 + * methods to guarantee this method can only be called one-at-a-time (either it is only called by one thread, or only one thread can + * access it at a time) -- we chose the 2nd option -- and use a 'synchronized' block to make sure that only one thread can access + * this method at a time. + */ public void subscribe(final Object listener) { final Class listenerClass = listener.getClass(); @@ -340,6 +344,18 @@ class SubscriptionManager { // activates this sub for sub/unsub (only used by the subscription writer thread) subsPerListener.put(listenerClass, subscriptions); + + // dump the super subscriptions + IdentityMap, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this); + superSingleSubs.clear(); + subsSuperSingleREF.lazySet((this), superSingleSubs); + + IdentityMap superMultiSubs = subsSuperMultiREF.get(this); + superMultiSubs.clear(); + subsSuperMultiREF.lazySet((this), superMultiSubs); + + + // save this snapshot back to the original (single writer principle) subsSingleREF.lazySet(this, singleSubs); subsMultiREF.lazySet(this, multiSubs); @@ -362,6 +378,16 @@ class SubscriptionManager { } } + + /** + * Un-subscribes a specific listener. The infrastructure for subscription never "shrinks", meaning that when a listener is un-subscribed, + * the listeners are only removed from the internal map -- the map itself is not cleaned up until a 'shutdown' is called. + * + * This method uses the "single-writer-principle" for lock-free publication. Since there are only 2 + * methods to guarantee this method can only be called one-at-a-time (either it is only called by one thread, or only one thread can + * access it at a time) -- we chose the 2nd option -- and use a 'synchronized' block to make sure that only one thread can access + * this method at a time. + */ public void unsubscribe(final Object listener) { final Class listenerClass = listener.getClass(); @@ -387,75 +413,19 @@ class SubscriptionManager { } } - private - void registerExtraSubs(final Class clazz, - final IdentityMap, Subscription[]> subsPerMessageSingle, - final IdentityMap, Subscription[]> subsPerSuperMessageSingle, - final IdentityMap, Subscription[]> subsPerVarityMessageSingle) { -// final Class arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response - final Class[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response - - Subscription sub; -// -// // Register Varity (Var-Arg) subscriptions -// final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion); -// if (arraySubs != null) { -// final int length = arraySubs.length; -// final ArrayList varArgSubsAsList = new ArrayList(length); -// -// for (int i = 0; i < length; i++) { -// sub = arraySubs[i]; -// -// if (sub.getHandlerAccess().acceptsVarArgs()) { -// varArgSubsAsList.add(sub); -// } -// } -// -// if (!varArgSubsAsList.isEmpty()) { -// subsPerVarityMessageSingle.put(clazz, varArgSubsAsList.toArray(new Subscription[0])); -// } -// } - - -// -// // Register SuperClass subscriptions -// final int length = superClasses.length; -// final ArrayList subsAsList = new ArrayList(length); -// -// // walks through all of the subscriptions that might exist for super types, and if applicable, save them -// for (int i = 0; i < length; i++) { -// final Class superClass = superClasses[i]; -// final Subscription[] superSubs = subsPerMessageSingle.get(superClass); -// -// if (superSubs != null) { -// int superSubLength = superSubs.length; -// for (int j = 0; j < superSubLength; j++) { -// sub = superSubs[j]; -// -// if (sub.getHandlerAccess().acceptsSubtypes()) { -// subsAsList.add(sub); -// } -// } -// } -// } -// -// if (!subsAsList.isEmpty()) { -// // save the subscriptions -// subsPerSuperMessageSingle.put(clazz, subsAsList.toArray(new Subscription[0])); -// } - } - - - - - // can return null + /** + * @return can return null + */ public Subscription[] getSubs(final Class messageClass) { return (Subscription[]) subsSingleREF.get(this).get(messageClass); } - // can return null + + /** + * @return can return null + */ public Subscription[] getSubs(final Class messageClass1, final Class messageClass2) { // never returns null @@ -464,7 +434,9 @@ class SubscriptionManager { return (Subscription[]) subsMultiREF.get(this).get(multiClass); } - // can return null + /** + * @return can return null + */ public Subscription[] getSubs(final Class messageClass1, final Class messageClass2, final Class messageClass3) { // never returns null @@ -474,17 +446,9 @@ class SubscriptionManager { return (Subscription[]) subsMultiREF.get(this).get(multiClass); } - // can return null - public - Subscription[] getSubs(final Class[] messageClasses) { - // never returns null - final MultiClass multiClass = classTree.get(messageClasses); - - return (Subscription[]) subsMultiREF.get(this).get(multiClass); - } - - - // can NOT return null + /** + * @return can NOT return null + */ public Subscription[] getSuperSubs(final Class messageClass) { // The subscriptions that are remembered here DO NOT CHANGE (only the listeners inside them change). @@ -494,7 +458,7 @@ class SubscriptionManager { Subscription[] subscriptions = localSuperSubs.get(messageClass); // the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are - // calculated and do not exist - this will be an empty array. + // calculated and if they do not exist - this will be an empty array. if (subscriptions == null) { final Class[] superClasses = this.classUtils.getSuperClasses(messageClass); // never returns null, cached response @@ -525,7 +489,7 @@ class SubscriptionManager { } // subsAsList now contains ALL of the super-class subscriptions. - subscriptions = subsAsList.toArray(new Subscription[0]); + subscriptions = subsAsList.toArray(EMPTY_SUBS); localSuperSubs.put(messageClass, subscriptions); subsSuperSingleREF.lazySet(this, localSuperSubs); @@ -534,133 +498,165 @@ class SubscriptionManager { return subscriptions; } - // can return null + /** + * @return can NOT return null + */ public Subscription[] getSuperSubs(final Class messageClass1, final Class messageClass2) { // save the subscriptions final Class[] superClasses1 = this.classUtils.getSuperClasses(messageClass1); // never returns null, cached response final Class[] superClasses2 = this.classUtils.getSuperClasses(messageClass2); // never returns null, cached response - final IdentityMap localSubs = subsMultiREF.get(this); + final MultiClass origMultiClass = classTree.get(messageClass1, messageClass2); - Class superClass1; - Class superClass2; - Subscription sub; - Subscription[] superSubs; - boolean hasSubs = false; + IdentityMap localSuperSubs = subsSuperMultiREF.get(this); + Subscription[] subscriptions = localSuperSubs.get(origMultiClass); + // the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are + // calculated and if they do not exist - this will be an empty array. + if (subscriptions == null) { + final IdentityMap localSubs = subsMultiREF.get(this); + + Class superClass1; + Class superClass2; + Subscription sub; + Subscription[] superSubs; - final int length1 = superClasses1.length; - final int length2 = superClasses2.length; + final int length1 = superClasses1.length; + final int length2 = superClasses2.length; - ArrayList subsAsList = new ArrayList(length1 + length2); + ArrayList subsAsList = new ArrayList(length1 + length2); - for (int i = 0; i < length1; i++) { - superClass1 = superClasses1[i]; - - // only go over subtypes - if (superClass1 == messageClass1) { - continue; - } - - for (int j = 0; j < length2; j++) { - superClass2 = superClasses2[j]; + for (int i = 0; i < length1; i++) { + superClass1 = superClasses1[i]; // only go over subtypes - if (superClass2 == messageClass2) { + if (superClass1 == messageClass1) { continue; } - // never returns null - final MultiClass multiClass = classTree.get(superClass1, - superClass2); - superSubs = localSubs.get(multiClass); - if (superSubs != null) { - for (int k = 0; k < superSubs.length; k++) { - sub = superSubs[k]; + for (int j = 0; j < length2; j++) { + superClass2 = superClasses2[j]; - if (sub.getHandler().acceptsSubtypes()) { - subsAsList.add(sub); - hasSubs = true; + // only go over subtypes + if (superClass2 == messageClass2) { + continue; + } + + // never returns null + MultiClass multiClass = classTree.get(superClass1, + superClass2); + + superSubs = localSubs.get(multiClass); + + //noinspection Duplicates + if (superSubs != null) { + for (int k = 0; k < superSubs.length; k++) { + sub = superSubs[k]; + + if (sub.getHandler().acceptsSubtypes()) { + subsAsList.add(sub); + } } } } } + + // subsAsList now contains ALL of the super-class subscriptions. + subscriptions = subsAsList.toArray(EMPTY_SUBS); + localSuperSubs.put(origMultiClass, subscriptions); + + subsSuperMultiREF.lazySet(this, localSuperSubs); } - // subsAsList now contains ALL of the super-class subscriptions. - if (hasSubs) { - return subsAsList.toArray(new Subscription[0]); - } - else { - // TODO: shortcut out if there are no handlers that accept subtypes - return null; - } + return subscriptions; } - - - - // can return null - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { -// ArrayList collection = getSubs(messageClass1, messageClass2); // can return null -// -// // now publish superClasses -// final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, -// this); // NOT return null -// -// if (collection != null) { -// collection = new ArrayList(collection); -// -// if (!superSubs.isEmpty()) { -// collection.addAll(superSubs); -// } -// } -// else if (!superSubs.isEmpty()) { -// collection = superSubs; -// } -// -// if (collection != null) { -// return collection.toArray(new Subscription[0]); -// } -// else { - return null; -// } - } - - // can return null - public - Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { -// -// ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null -// -// // now publish superClasses -// final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, -// this); // NOT return null -// -// if (collection != null) { -// collection = new ArrayList(collection); -// -// if (!superSubs.isEmpty()) { -// collection.addAll(superSubs); -// } -// } -// else if (!superSubs.isEmpty()) { -// collection = superSubs; -// } -// -// if (collection != null) { -// return collection.toArray(new Subscription[0]); -// } -// else { - return null; -// } - } - - + /** + * @return can NOT return null + */ public Subscription[] getSuperSubs(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - return null; + // save the subscriptions + final Class[] superClasses1 = this.classUtils.getSuperClasses(messageClass1); // never returns null, cached response + final Class[] superClasses2 = this.classUtils.getSuperClasses(messageClass2); // never returns null, cached response + final Class[] superClasses3 = this.classUtils.getSuperClasses(messageClass3); // never returns null, cached response + + final MultiClass origMultiClass = classTree.get(messageClass1, messageClass2, messageClass3); + + IdentityMap localSuperSubs = subsSuperMultiREF.get(this); + Subscription[] subscriptions = localSuperSubs.get(origMultiClass); + // the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are + // calculated and if they do not exist - this will be an empty array. + if (subscriptions == null) { + final IdentityMap localSubs = subsMultiREF.get(this); + + + + Class superClass1; + Class superClass2; + Class superClass3; + Subscription sub; + Subscription[] superSubs; + + final int length1 = superClasses1.length; + final int length2 = superClasses2.length; + final int length3 = superClasses3.length; + + ArrayList subsAsList = new ArrayList(length1 + length2); + + for (int i = 0; i < length1; i++) { + superClass1 = superClasses1[i]; + + // only go over subtypes + if (superClass1 == messageClass1) { + continue; + } + + for (int j = 0; j < length2; j++) { + superClass2 = superClasses2[j]; + + // only go over subtypes + if (superClass2 == messageClass2) { + continue; + } + + for (int k = 0; k < length3; k++) { + superClass3 = superClasses3[j]; + + // only go over subtypes + if (superClass3 == messageClass3) { + continue; + } + + // never returns null + MultiClass multiClass = classTree.get(superClass1, + superClass2, + superClass3); + + superSubs = localSubs.get(multiClass); + + //noinspection Duplicates + if (superSubs != null) { + for (int m = 0; m < superSubs.length; m++) { + sub = superSubs[m]; + + if (sub.getHandler().acceptsSubtypes()) { + subsAsList.add(sub); + } + } + } + } + } + } + + // subsAsList now contains ALL of the super-class subscriptions. + subscriptions = subsAsList.toArray(EMPTY_SUBS); + localSuperSubs.put(origMultiClass, subscriptions); + + subsSuperMultiREF.lazySet(this, localSuperSubs); + } + + return subscriptions; } } diff --git a/src/dorkbox/util/messagebus/subscription/asm/SubscriptionAsm.java b/src/dorkbox/util/messagebus/subscription/asm/SubscriptionAsm.java index 938a1cc..81ad34c 100644 --- a/src/dorkbox/util/messagebus/subscription/asm/SubscriptionAsm.java +++ b/src/dorkbox/util/messagebus/subscription/asm/SubscriptionAsm.java @@ -90,68 +90,56 @@ class SubscriptionAsm extends Subscription { * @return true if messages were published */ public - boolean publish(final Object message) throws Throwable { + void publish(final Object message) throws Throwable { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, handler, handleIndex, message); } - - return hasSubs; } /** * @return true if messages were published */ public - boolean publish(final Object message1, final Object message2) throws Throwable { + void publish(final Object message1, final Object message2) throws Throwable { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, handler, handleIndex, message1, message2); } - - return hasSubs; } /** * @return true if messages were published */ public - boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable { + void publish(final Object message1, final Object message2, final Object message3) throws Throwable { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, handler, handleIndex, message1, message2, message3); } - - return hasSubs; } } diff --git a/src/dorkbox/util/messagebus/subscription/reflection/SubscriptionReflection.java b/src/dorkbox/util/messagebus/subscription/reflection/SubscriptionReflection.java index dfceea0..7858319 100644 --- a/src/dorkbox/util/messagebus/subscription/reflection/SubscriptionReflection.java +++ b/src/dorkbox/util/messagebus/subscription/reflection/SubscriptionReflection.java @@ -83,65 +83,53 @@ class SubscriptionReflection extends Subscription { * @return true if messages were published */ public - boolean publish(final Object message) throws Throwable { + void publish(final Object message) throws Throwable { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, method, message); } - - return hasSubs; } /** * @return true if messages were published */ public - boolean publish(final Object message1, final Object message2) throws Throwable { + void publish(final Object message1, final Object message2) throws Throwable { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, method, message1, message2); } - - return hasSubs; } /** * @return true if messages were published */ public - boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable { + void publish(final Object message1, final Object message2, final Object message3) throws Throwable { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - boolean hasSubs = false; Entry current = headREF.get(this); Object listener; while (current != null) { - hasSubs = true; listener = current.getValue(); current = current.next(); invocation.invoke(listener, method, message1, message2, message3); } - - return hasSubs; } } diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java index c5d5136..5f6c46a 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java @@ -34,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue; * * @author dorkbox, llc Date: 2/3/16 */ -public +public final class AsyncABQ implements Synchrony { private final ArrayBlockingQueue dispatchQueue; @@ -147,8 +147,7 @@ class AsyncABQ implements Synchrony { } } - // unfortunately, this isn't as friendly to GC as the disruptor is... - + @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { MessageHolder take = new MessageHolder(); @@ -162,13 +161,26 @@ class AsyncABQ implements Synchrony { @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { - this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2)); + MessageHolder take = new MessageHolder(); + take.type = MessageType.TWO; + take.subscriptions = subscriptions; + take.message1 = message1; + take.message2 = message2; + + this.dispatchQueue.put(take); } @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { - this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2, message3)); + MessageHolder take = new MessageHolder(); + take.type = MessageType.THREE; + take.subscriptions = subscriptions; + take.message1 = message1; + take.message2 = message2; + take.message3 = message3; + + this.dispatchQueue.put(take); } public diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java index 2567167..1a43930 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java @@ -34,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue; * * @author dorkbox, llc Date: 2/3/16 */ -public +public final class AsyncABQ_noGc implements Synchrony { private final ArrayBlockingQueue dispatchQueue; @@ -162,8 +162,7 @@ class AsyncABQ_noGc implements Synchrony { } } - // unfortunately, this isn't as friendly to GC as the disruptor is... - + @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { MessageHolder take = gcQueue.take(); @@ -177,13 +176,26 @@ class AsyncABQ_noGc implements Synchrony { @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { - this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2)); + MessageHolder take = gcQueue.take(); + take.type = MessageType.TWO; + take.subscriptions = subscriptions; + take.message1 = message1; + take.message2 = message2; + + this.dispatchQueue.put(take); } @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { - this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2, message3)); + MessageHolder take = gcQueue.take(); + take.type = MessageType.THREE; + take.subscriptions = subscriptions; + take.message1 = message1; + take.message2 = message2; + take.message3 = message3; + + this.dispatchQueue.put(take); } public diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java index 291c5b4..869c41a 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java @@ -40,7 +40,7 @@ import java.util.concurrent.locks.LockSupport; /** * @author dorkbox, llc Date: 2/3/16 */ -public +public final class AsyncDisruptor implements Synchrony { private final ErrorHandlingSupport errorHandler; @@ -122,7 +122,7 @@ class AsyncDisruptor implements Synchrony { } } - + @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { long seq = ringBuffer.next(); @@ -138,13 +138,30 @@ class AsyncDisruptor implements Synchrony { @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + long seq = ringBuffer.next(); + MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.TWO; + job.subscriptions = subscriptions; + job.message1 = message1; + job.message2 = message2; + + ringBuffer.publish(seq); } @Override public void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + long seq = ringBuffer.next(); + MessageHolder job = ringBuffer.get(seq); + job.type = MessageType.THREE; + job.subscriptions = subscriptions; + job.message1 = message1; + job.message3 = message2; + job.message2 = message3; + + ringBuffer.publish(seq); } // gets the sequences used for processing work diff --git a/src/dorkbox/util/messagebus/synchrony/Sync.java b/src/dorkbox/util/messagebus/synchrony/Sync.java index d4d9f91..53fbaea 100644 --- a/src/dorkbox/util/messagebus/synchrony/Sync.java +++ b/src/dorkbox/util/messagebus/synchrony/Sync.java @@ -20,12 +20,11 @@ import dorkbox.util.messagebus.subscription.Subscription; /** * @author dorkbox, llc Date: 2/2/15 */ -public +public final class Sync implements Synchrony { public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { Subscription sub; - boolean hasSubs = false; for (int i = 0; i < subscriptions.length; i++) { sub = subscriptions[i]; sub.publish(message1); @@ -55,13 +54,11 @@ class Sync implements Synchrony { @Override public void start() { - } @Override public void shutdown() { - } @Override diff --git a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java deleted file mode 100644 index c17d9fc..0000000 --- a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.utils; - -import com.esotericsoftware.kryo.util.IdentityMap; -import dorkbox.util.messagebus.common.ClassTree; -import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionManager; - -import java.util.ArrayList; - -public final -class SubscriptionUtils { - private final ClassUtils superClass; - - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers - // it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one - - // keeps track of all subscriptions of the super classes of a message type. - private volatile IdentityMap, Subscription[]> superClassSubscriptions; - private final ClassTree> superClassSubscriptionsMulti; - - - public - SubscriptionUtils(final ClassUtils superClass, final float loadFactor, final int numberOfThreads) { - this.superClass = superClass; - - - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = new IdentityMap, Subscription[]>(8, loadFactor); - this.superClassSubscriptionsMulti = new ClassTree>(); - } - - public - void clear() { - this.superClassSubscriptions.clear(); - this.superClassSubscriptionsMulti.clear(); - } - -// // ALWAYS register and create a cached version of the requested class + superClasses -// // ONLY called during subscribe -// public -// Subscription[] register(final Class clazz, final SubscriptionManager subManager) { -// final IdentityMap, Subscription[]> local = this.superClassSubscriptions; -// -// // types was not empty, so collect subscriptions for each type and collate them -// -// // save the subscriptions -// final Class[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response -// -// Class superClass; -// Subscription[] superSubs; -// Subscription sub; -// -// final int length = superClasses.length; -// int superSubLength; -// final ArrayList subsAsList = new ArrayList(length); -// -// for (int i = 0; i < length; i++) { -// superClass = superClasses[i]; -// superSubs = subManager.getExactAsArray(superClass); -// -// if (superSubs != null) { -// superSubLength = superSubs.length; -// for (int j = 0; j < superSubLength; j++) { -// sub = superSubs[j]; -// -// if (sub.getHandlerAccess().acceptsSubtypes()) { -// subsAsList.add(sub); -// } -// } -// } -// } -// -// final int size = subsAsList.size(); -// if (size > 0) { -// Subscription[] subs = new Subscription[size]; -// subsAsList.toArray(subs); -// local.put(clazz, subs); -// -// superClassSubscriptions = local; -// -// return subs; -// } -// -// return null; -// } - - /** - * Returns an array COPY of the super subscriptions for the specified type. - *

- * This ALSO checks to see if the superClass accepts subtypes. - * - * @return CAN RETURN NULL - */ - public - Subscription[] getSuperSubscriptions(final Class clazz) { - // whenever our subscriptions change, this map is cleared. - return this.superClassSubscriptions.get(clazz); - } - - /** - * Returns an array COPY of the super subscriptions for the specified type. - *

- * This ALSO checks to see if the superClass accepts subtypes. - * - * @return CAN NOT RETURN NULL - */ - public - ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final SubscriptionManager subManager) { -// // whenever our subscriptions change, this map is cleared. -// final MapTree, ArrayList> cached = this.superClassSubscriptionsMulti; -// -// ArrayList subs = cached.get(clazz1, clazz2); -// -// if (subs == null) { -// // types was not empty, so collect subscriptions for each type and collate them -// -// // save the subscriptions -// final Class[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response -// final Class[] superClasses2 = this.superClass.getSuperClasses(clazz2); // never returns null, cached response -// -// Class superClass1; -// Class superClass2; -// ArrayList superSubs; -// Subscription sub; -// -// final int length1 = superClasses1.length; -// final int length2 = superClasses2.length; -// -// subs = new ArrayList(length1 + length2); -// -// for (int i = 0; i < length1; i++) { -// superClass1 = superClasses1[i]; -// -// // only go over subtypes -// if (superClass1 == clazz1) { -// continue; -// } -// -// for (int j = 0; j < length2; j++) { -// superClass2 = superClasses2[j]; -// -// // only go over subtypes -// if (superClass2 == clazz2) { -// continue; -// } -// -// superSubs = subManager.getExactAsArray(superClass1, superClass2); -// if (superSubs != null) { -// for (int k = 0; k < superSubs.size(); k++) { -// sub = superSubs.get(k); -// -// if (sub.getHandlerAccess().acceptsSubtypes()) { -// subs.add(sub); -// } -// } -// } -// } -// } -// subs.trimToSize(); -// cached.put(subs, clazz1, clazz2); -// } - -// return subs; - return null; - } - - /** - * Returns an array COPY of the super subscriptions for the specified type. - *

- * This ALSO checks to see if the superClass accepts subtypes. - * - * @return CAN NOT RETURN NULL - */ - public - ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Class clazz3, - final SubscriptionManager subManager) { -// // whenever our subscriptions change, this map is cleared. -// final MapTree, ArrayList> local = this.superClassSubscriptionsMulti; -// -// ArrayList subs = local.get(clazz1, clazz2, clazz3); -// -// if (subs == null) { -// // types was not empty, so collect subscriptions for each type and collate them -// -// // save the subscriptions -// final Class[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response -// final Class[] superClasses2 = this.superClass.getSuperClasses(clazz2); // never returns null, cached response -// final Class[] superClasses3 = this.superClass.getSuperClasses(clazz3); // never returns null, cached response -// -// Class superClass1; -// Class superClass2; -// Class superClass3; -// ArrayList superSubs; -// Subscription sub; -// -// final int length1 = superClasses1.length; -// final int length2 = superClasses2.length; -// final int length3 = superClasses3.length; -// -// subs = new ArrayList(length1 + length2 + length3); -// -// for (int i = 0; i < length1; i++) { -// superClass1 = superClasses1[i]; -// -// // only go over subtypes -// if (superClass1 == clazz1) { -// continue; -// } -// -// for (int j = 0; j < length2; j++) { -// superClass2 = superClasses2[j]; -// -// // only go over subtypes -// if (superClass2 == clazz2) { -// continue; -// } -// -// for (int k = 0; k < length3; k++) { -// superClass3 = superClasses3[j]; -// -// // only go over subtypes -// if (superClass3 == clazz3) { -// continue; -// } -// -// superSubs = subManager.getExactAsArray(superClass1, superClass2, superClass3); -// if (superSubs != null) { -// for (int m = 0; m < superSubs.size(); m++) { -// sub = superSubs.get(m); -// -// if (sub.getHandlerAccess().acceptsSubtypes()) { -// subs.add(sub); -// } -// } -// } -// } -// } -// } -// subs.trimToSize(); -// local.put(subs, clazz1, clazz2, clazz3); -// } -// -// return subs; - return null; - } -} diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index f453bf9..2dba1d7 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -220,7 +220,7 @@ public class SubscriptionManagerTest extends AssertSupport { ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class); final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler); + final SubscriptionManager subscriptionManager = new SubscriptionManager(1); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); @@ -247,7 +247,7 @@ public class SubscriptionManagerTest extends AssertSupport { private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) { final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); - final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler); + final SubscriptionManager subscriptionManager = new SubscriptionManager(1); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);