Cleaned up multi-subs and multi-super-subs

This commit is contained in:
nathan 2016-02-06 21:05:37 +01:00
parent c323e29287
commit 7e262d1f0c
14 changed files with 279 additions and 520 deletions

View File

@ -118,7 +118,7 @@ class MessageBus implements IMessageBus {
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
switch (publishMode) {
case Exact:

View File

@ -35,9 +35,6 @@ import java.util.concurrent.atomic.AtomicReference;
* Date: 2/2/15
*/
public class ClassTree<KEY> {
public static int INITIAL_SIZE = 4;
public static float LOAD_FACTOR = 0.8F;

View File

@ -44,12 +44,13 @@ class PublisherExact implements Publisher {
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}
@ -72,12 +73,13 @@ class PublisherExact implements Publisher {
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1, message2);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2));
}
@ -101,12 +103,13 @@ class PublisherExact implements Publisher {
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1, message2, message3);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3));
}

View File

@ -47,6 +47,8 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run subscriptions
final Subscription[] subscriptions = subManager.getSubs(message1Class); // can return null
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1);
}
@ -54,6 +56,8 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(message1Class); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1);
}
@ -86,14 +90,18 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run subscriptions
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1, message2);
}
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // can return null
if (superSubscriptions != null) {
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1, message2);
}
@ -127,14 +135,18 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run subscriptions
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1, message2, message3);
}
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // can return null
if (superSubscriptions != null) {
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1, message2, message3);
}

View File

@ -147,19 +147,19 @@ class Subscription {
* @return true if messages were published
*/
public abstract
boolean publish(final Object message) throws Throwable;
void publish(final Object message) throws Throwable;
/**
* @return true if messages were published
*/
public abstract
boolean publish(final Object message1, final Object message2) throws Throwable;
void publish(final Object message1, final Object message2) throws Throwable;
/**
* @return true if messages were published
*/
public abstract
boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable;
void publish(final Object message1, final Object message2, final Object message3) throws Throwable;
@Override

View File

@ -20,11 +20,9 @@ import dorkbox.util.messagebus.MessageBus;
import dorkbox.util.messagebus.common.ClassTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.MultiClass;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.subscription.asm.SubMakerAsm;
import dorkbox.util.messagebus.subscription.reflection.SubMakerReflection;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,12 +39,11 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* @author dorkbox, llc
* Date: 2/2/15
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "ToArrayCallWithZeroLengthArrayArgument"})
public final
class SubscriptionManager {
public static final float LOAD_FACTOR = 0.8F;
// TODO: during startup, pre-calculate the number of subscription listeners and x2 to save as subsPerListener expected max size
public static final Subscription[] EMPTY_SUBS = new Subscription[0];
// controls if we use java reflection or ASM to access methods during publication
private final SubMaker subMaker;
@ -70,6 +67,7 @@ class SubscriptionManager {
// keeps track of all subscriptions of the super classes of a message type.
private volatile IdentityMap<Class<?>, Subscription[]> subsSuperSingle;
private volatile IdentityMap<MultiClass, Subscription[]> subsSuperMulti;
// In order to force the "Single writer principle" on subscribe & unsubscribe, they are within WRITE LOCKS. They could be dispatched
// to another thread, however we do NOT want them asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is
@ -78,11 +76,6 @@ class SubscriptionManager {
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
private final ClassTree<Class<?>> classTree;
private final ClassUtils classUtils;
@ -105,14 +98,13 @@ class SubscriptionManager {
IdentityMap.class,
"subsSuperSingle");
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
// correct type too)
private static final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsSuperMultiREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
"subsSuperSingle");
public
SubscriptionManager(final int numberOfThreads, final ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
SubscriptionManager(final int numberOfThreads) {
if (MessageBus.useAsmForDispatch) {
this.subMaker = new SubMakerAsm();
}
@ -130,12 +122,15 @@ class SubscriptionManager {
subsSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
subsSuperMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
classTree = new ClassTree<Class<?>>();
subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR, numberOfThreads);
}
/**
* Shuts down and clears all memory usage by the subscriptions
*/
public
void shutdown() {
@ -164,6 +159,15 @@ class SubscriptionManager {
this.classUtils.shutdown();
}
/**
* Subscribes a specific listener. The infrastructure for subscription never "shrinks", meaning that when a listener is un-subscribed,
* the listeners are only removed from the internal map -- the map itself is not cleaned up until a 'shutdown' is called.
*
* This method uses the "single-writer-principle" for lock-free publication. Since there are only 2
* methods to guarantee this method can only be called one-at-a-time (either it is only called by one thread, or only one thread can
* access it at a time) -- we chose the 2nd option -- and use a 'synchronized' block to make sure that only one thread can access
* this method at a time.
*/
public
void subscribe(final Object listener) {
final Class<?> listenerClass = listener.getClass();
@ -340,6 +344,18 @@ class SubscriptionManager {
// activates this sub for sub/unsub (only used by the subscription writer thread)
subsPerListener.put(listenerClass, subscriptions);
// dump the super subscriptions
IdentityMap<Class<?>, Subscription[]> superSingleSubs = subsSuperSingleREF.get(this);
superSingleSubs.clear();
subsSuperSingleREF.lazySet((this), superSingleSubs);
IdentityMap<MultiClass, Subscription[]> superMultiSubs = subsSuperMultiREF.get(this);
superMultiSubs.clear();
subsSuperMultiREF.lazySet((this), superMultiSubs);
// save this snapshot back to the original (single writer principle)
subsSingleREF.lazySet(this, singleSubs);
subsMultiREF.lazySet(this, multiSubs);
@ -362,6 +378,16 @@ class SubscriptionManager {
}
}
/**
* Un-subscribes a specific listener. The infrastructure for subscription never "shrinks", meaning that when a listener is un-subscribed,
* the listeners are only removed from the internal map -- the map itself is not cleaned up until a 'shutdown' is called.
*
* This method uses the "single-writer-principle" for lock-free publication. Since there are only 2
* methods to guarantee this method can only be called one-at-a-time (either it is only called by one thread, or only one thread can
* access it at a time) -- we chose the 2nd option -- and use a 'synchronized' block to make sure that only one thread can access
* this method at a time.
*/
public
void unsubscribe(final Object listener) {
final Class<?> listenerClass = listener.getClass();
@ -387,75 +413,19 @@ class SubscriptionManager {
}
}
private
void registerExtraSubs(final Class<?> clazz,
final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle,
final IdentityMap<Class<?>, Subscription[]> subsPerSuperMessageSingle,
final IdentityMap<Class<?>, Subscription[]> subsPerVarityMessageSingle) {
// final Class<?> arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response
final Class<?>[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response
Subscription sub;
//
// // Register Varity (Var-Arg) subscriptions
// final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion);
// if (arraySubs != null) {
// final int length = arraySubs.length;
// final ArrayList<Subscription> varArgSubsAsList = new ArrayList<Subscription>(length);
//
// for (int i = 0; i < length; i++) {
// sub = arraySubs[i];
//
// if (sub.getHandlerAccess().acceptsVarArgs()) {
// varArgSubsAsList.add(sub);
// }
// }
//
// if (!varArgSubsAsList.isEmpty()) {
// subsPerVarityMessageSingle.put(clazz, varArgSubsAsList.toArray(new Subscription[0]));
// }
// }
//
// // Register SuperClass subscriptions
// final int length = superClasses.length;
// final ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length);
//
// // walks through all of the subscriptions that might exist for super types, and if applicable, save them
// for (int i = 0; i < length; i++) {
// final Class<?> superClass = superClasses[i];
// final Subscription[] superSubs = subsPerMessageSingle.get(superClass);
//
// if (superSubs != null) {
// int superSubLength = superSubs.length;
// for (int j = 0; j < superSubLength; j++) {
// sub = superSubs[j];
//
// if (sub.getHandlerAccess().acceptsSubtypes()) {
// subsAsList.add(sub);
// }
// }
// }
// }
//
// if (!subsAsList.isEmpty()) {
// // save the subscriptions
// subsPerSuperMessageSingle.put(clazz, subsAsList.toArray(new Subscription[0]));
// }
}
// can return null
/**
* @return can return null
*/
public
Subscription[] getSubs(final Class<?> messageClass) {
return (Subscription[]) subsSingleREF.get(this).get(messageClass);
}
// can return null
/**
* @return can return null
*/
public
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2) {
// never returns null
@ -464,7 +434,9 @@ class SubscriptionManager {
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
}
// can return null
/**
* @return can return null
*/
public
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
// never returns null
@ -474,17 +446,9 @@ class SubscriptionManager {
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
}
// can return null
public
Subscription[] getSubs(final Class<?>[] messageClasses) {
// never returns null
final MultiClass multiClass = classTree.get(messageClasses);
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
}
// can NOT return null
/**
* @return can NOT return null
*/
public
Subscription[] getSuperSubs(final Class<?> messageClass) {
// The subscriptions that are remembered here DO NOT CHANGE (only the listeners inside them change).
@ -494,7 +458,7 @@ class SubscriptionManager {
Subscription[] subscriptions = localSuperSubs.get(messageClass);
// the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are
// calculated and do not exist - this will be an empty array.
// calculated and if they do not exist - this will be an empty array.
if (subscriptions == null) {
final Class<?>[] superClasses = this.classUtils.getSuperClasses(messageClass); // never returns null, cached response
@ -525,7 +489,7 @@ class SubscriptionManager {
}
// subsAsList now contains ALL of the super-class subscriptions.
subscriptions = subsAsList.toArray(new Subscription[0]);
subscriptions = subsAsList.toArray(EMPTY_SUBS);
localSuperSubs.put(messageClass, subscriptions);
subsSuperSingleREF.lazySet(this, localSuperSubs);
@ -534,133 +498,165 @@ class SubscriptionManager {
return subscriptions;
}
// can return null
/**
* @return can NOT return null
*/
public
Subscription[] getSuperSubs(final Class<?> messageClass1, final Class<?> messageClass2) {
// save the subscriptions
final Class<?>[] superClasses1 = this.classUtils.getSuperClasses(messageClass1); // never returns null, cached response
final Class<?>[] superClasses2 = this.classUtils.getSuperClasses(messageClass2); // never returns null, cached response
final IdentityMap<MultiClass, Subscription[]> localSubs = subsMultiREF.get(this);
final MultiClass origMultiClass = classTree.get(messageClass1, messageClass2);
Class<?> superClass1;
Class<?> superClass2;
Subscription sub;
Subscription[] superSubs;
boolean hasSubs = false;
IdentityMap<MultiClass, Subscription[]> localSuperSubs = subsSuperMultiREF.get(this);
Subscription[] subscriptions = localSuperSubs.get(origMultiClass);
// the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are
// calculated and if they do not exist - this will be an empty array.
if (subscriptions == null) {
final IdentityMap<MultiClass, Subscription[]> localSubs = subsMultiREF.get(this);
Class<?> superClass1;
Class<?> superClass2;
Subscription sub;
Subscription[] superSubs;
final int length1 = superClasses1.length;
final int length2 = superClasses2.length;
final int length1 = superClasses1.length;
final int length2 = superClasses2.length;
ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length1 + length2);
ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length1 + length2);
for (int i = 0; i < length1; i++) {
superClass1 = superClasses1[i];
// only go over subtypes
if (superClass1 == messageClass1) {
continue;
}
for (int j = 0; j < length2; j++) {
superClass2 = superClasses2[j];
for (int i = 0; i < length1; i++) {
superClass1 = superClasses1[i];
// only go over subtypes
if (superClass2 == messageClass2) {
if (superClass1 == messageClass1) {
continue;
}
// never returns null
final MultiClass multiClass = classTree.get(superClass1,
superClass2);
superSubs = localSubs.get(multiClass);
if (superSubs != null) {
for (int k = 0; k < superSubs.length; k++) {
sub = superSubs[k];
for (int j = 0; j < length2; j++) {
superClass2 = superClasses2[j];
if (sub.getHandler().acceptsSubtypes()) {
subsAsList.add(sub);
hasSubs = true;
// only go over subtypes
if (superClass2 == messageClass2) {
continue;
}
// never returns null
MultiClass multiClass = classTree.get(superClass1,
superClass2);
superSubs = localSubs.get(multiClass);
//noinspection Duplicates
if (superSubs != null) {
for (int k = 0; k < superSubs.length; k++) {
sub = superSubs[k];
if (sub.getHandler().acceptsSubtypes()) {
subsAsList.add(sub);
}
}
}
}
}
// subsAsList now contains ALL of the super-class subscriptions.
subscriptions = subsAsList.toArray(EMPTY_SUBS);
localSuperSubs.put(origMultiClass, subscriptions);
subsSuperMultiREF.lazySet(this, localSuperSubs);
}
// subsAsList now contains ALL of the super-class subscriptions.
if (hasSubs) {
return subsAsList.toArray(new Subscription[0]);
}
else {
// TODO: shortcut out if there are no handlers that accept subtypes
return null;
}
return subscriptions;
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
// ArrayList<Subscription> collection = getSubs(messageClass1, messageClass2); // can return null
//
// // now publish superClasses
// final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
// this); // NOT return null
//
// if (collection != null) {
// collection = new ArrayList<Subscription>(collection);
//
// if (!superSubs.isEmpty()) {
// collection.addAll(superSubs);
// }
// }
// else if (!superSubs.isEmpty()) {
// collection = superSubs;
// }
//
// if (collection != null) {
// return collection.toArray(new Subscription[0]);
// }
// else {
return null;
// }
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
//
// ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
//
// // now publish superClasses
// final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
// this); // NOT return null
//
// if (collection != null) {
// collection = new ArrayList<Subscription>(collection);
//
// if (!superSubs.isEmpty()) {
// collection.addAll(superSubs);
// }
// }
// else if (!superSubs.isEmpty()) {
// collection = superSubs;
// }
//
// if (collection != null) {
// return collection.toArray(new Subscription[0]);
// }
// else {
return null;
// }
}
/**
* @return can NOT return null
*/
public
Subscription[] getSuperSubs(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return null;
// save the subscriptions
final Class<?>[] superClasses1 = this.classUtils.getSuperClasses(messageClass1); // never returns null, cached response
final Class<?>[] superClasses2 = this.classUtils.getSuperClasses(messageClass2); // never returns null, cached response
final Class<?>[] superClasses3 = this.classUtils.getSuperClasses(messageClass3); // never returns null, cached response
final MultiClass origMultiClass = classTree.get(messageClass1, messageClass2, messageClass3);
IdentityMap<MultiClass, Subscription[]> localSuperSubs = subsSuperMultiREF.get(this);
Subscription[] subscriptions = localSuperSubs.get(origMultiClass);
// the only time this is null, is when subscriptions DO NOT exist, and they haven't been calculated. Otherwise, if they are
// calculated and if they do not exist - this will be an empty array.
if (subscriptions == null) {
final IdentityMap<MultiClass, Subscription[]> localSubs = subsMultiREF.get(this);
Class<?> superClass1;
Class<?> superClass2;
Class<?> superClass3;
Subscription sub;
Subscription[] superSubs;
final int length1 = superClasses1.length;
final int length2 = superClasses2.length;
final int length3 = superClasses3.length;
ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length1 + length2);
for (int i = 0; i < length1; i++) {
superClass1 = superClasses1[i];
// only go over subtypes
if (superClass1 == messageClass1) {
continue;
}
for (int j = 0; j < length2; j++) {
superClass2 = superClasses2[j];
// only go over subtypes
if (superClass2 == messageClass2) {
continue;
}
for (int k = 0; k < length3; k++) {
superClass3 = superClasses3[j];
// only go over subtypes
if (superClass3 == messageClass3) {
continue;
}
// never returns null
MultiClass multiClass = classTree.get(superClass1,
superClass2,
superClass3);
superSubs = localSubs.get(multiClass);
//noinspection Duplicates
if (superSubs != null) {
for (int m = 0; m < superSubs.length; m++) {
sub = superSubs[m];
if (sub.getHandler().acceptsSubtypes()) {
subsAsList.add(sub);
}
}
}
}
}
}
// subsAsList now contains ALL of the super-class subscriptions.
subscriptions = subsAsList.toArray(EMPTY_SUBS);
localSuperSubs.put(origMultiClass, subscriptions);
subsSuperMultiREF.lazySet(this, localSuperSubs);
}
return subscriptions;
}
}

View File

@ -90,68 +90,56 @@ class SubscriptionAsm extends Subscription {
* @return true if messages were published
*/
public
boolean publish(final Object message) throws Throwable {
void publish(final Object message) throws Throwable {
final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, handler, handleIndex, message);
}
return hasSubs;
}
/**
* @return true if messages were published
*/
public
boolean publish(final Object message1, final Object message2) throws Throwable {
void publish(final Object message1, final Object message2) throws Throwable {
final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2);
}
return hasSubs;
}
/**
* @return true if messages were published
*/
public
boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable {
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
}
return hasSubs;
}
}

View File

@ -83,65 +83,53 @@ class SubscriptionReflection extends Subscription {
* @return true if messages were published
*/
public
boolean publish(final Object message) throws Throwable {
void publish(final Object message) throws Throwable {
final Method method = this.method;
final ReflectionInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, method, message);
}
return hasSubs;
}
/**
* @return true if messages were published
*/
public
boolean publish(final Object message1, final Object message2) throws Throwable {
void publish(final Object message1, final Object message2) throws Throwable {
final Method method = this.method;
final ReflectionInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, method, message1, message2);
}
return hasSubs;
}
/**
* @return true if messages were published
*/
public
boolean publish(final Object message1, final Object message2, final Object message3) throws Throwable {
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
final Method method = this.method;
final ReflectionInvocation invocation = this.invocation;
boolean hasSubs = false;
Entry current = headREF.get(this);
Object listener;
while (current != null) {
hasSubs = true;
listener = current.getValue();
current = current.next();
invocation.invoke(listener, method, message1, message2, message3);
}
return hasSubs;
}
}

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue;
*
* @author dorkbox, llc Date: 2/3/16
*/
public
public final
class AsyncABQ implements Synchrony {
private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
@ -147,8 +147,7 @@ class AsyncABQ implements Synchrony {
}
}
// unfortunately, this isn't as friendly to GC as the disruptor is...
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
MessageHolder take = new MessageHolder();
@ -162,13 +161,26 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2));
MessageHolder take = new MessageHolder();
take.type = MessageType.TWO;
take.subscriptions = subscriptions;
take.message1 = message1;
take.message2 = message2;
this.dispatchQueue.put(take);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2, message3));
MessageHolder take = new MessageHolder();
take.type = MessageType.THREE;
take.subscriptions = subscriptions;
take.message1 = message1;
take.message2 = message2;
take.message3 = message3;
this.dispatchQueue.put(take);
}
public

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue;
*
* @author dorkbox, llc Date: 2/3/16
*/
public
public final
class AsyncABQ_noGc implements Synchrony {
private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
@ -162,8 +162,7 @@ class AsyncABQ_noGc implements Synchrony {
}
}
// unfortunately, this isn't as friendly to GC as the disruptor is...
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
MessageHolder take = gcQueue.take();
@ -177,13 +176,26 @@ class AsyncABQ_noGc implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2));
MessageHolder take = gcQueue.take();
take.type = MessageType.TWO;
take.subscriptions = subscriptions;
take.message1 = message1;
take.message2 = message2;
this.dispatchQueue.put(take);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
this.dispatchQueue.put(new MessageHolder(subscriptions, message1, message2, message3));
MessageHolder take = gcQueue.take();
take.type = MessageType.THREE;
take.subscriptions = subscriptions;
take.message1 = message1;
take.message2 = message2;
take.message3 = message3;
this.dispatchQueue.put(take);
}
public

View File

@ -40,7 +40,7 @@ import java.util.concurrent.locks.LockSupport;
/**
* @author dorkbox, llc Date: 2/3/16
*/
public
public final
class AsyncDisruptor implements Synchrony {
private final ErrorHandlingSupport errorHandler;
@ -122,7 +122,7 @@ class AsyncDisruptor implements Synchrony {
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
long seq = ringBuffer.next();
@ -138,13 +138,30 @@ class AsyncDisruptor implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.TWO;
job.subscriptions = subscriptions;
job.message1 = message1;
job.message2 = message2;
ringBuffer.publish(seq);
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.THREE;
job.subscriptions = subscriptions;
job.message1 = message1;
job.message3 = message2;
job.message2 = message3;
ringBuffer.publish(seq);
}
// gets the sequences used for processing work

View File

@ -20,12 +20,11 @@ import dorkbox.util.messagebus.subscription.Subscription;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public
public final
class Sync implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
Subscription sub;
boolean hasSubs = false;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
@ -55,13 +54,11 @@ class Sync implements Synchrony {
@Override
public
void start() {
}
@Override
public
void shutdown() {
}
@Override

View File

@ -1,263 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.utils;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.util.messagebus.common.ClassTree;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayList;
public final
class SubscriptionUtils {
private final ClassUtils superClass;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one
// keeps track of all subscriptions of the super classes of a message type.
private volatile IdentityMap<Class<?>, Subscription[]> superClassSubscriptions;
private final ClassTree<Class<?>> superClassSubscriptionsMulti;
public
SubscriptionUtils(final ClassUtils superClass, final float loadFactor, final int numberOfThreads) {
this.superClass = superClass;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new IdentityMap<Class<?>, Subscription[]>(8, loadFactor);
this.superClassSubscriptionsMulti = new ClassTree<Class<?>>();
}
public
void clear() {
this.superClassSubscriptions.clear();
this.superClassSubscriptionsMulti.clear();
}
// // ALWAYS register and create a cached version of the requested class + superClasses
// // ONLY called during subscribe
// public
// Subscription[] register(final Class<?> clazz, final SubscriptionManager subManager) {
// final IdentityMap<Class<?>, Subscription[]> local = this.superClassSubscriptions;
//
// // types was not empty, so collect subscriptions for each type and collate them
//
// // save the subscriptions
// final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
//
// Class<?> superClass;
// Subscription[] superSubs;
// Subscription sub;
//
// final int length = superClasses.length;
// int superSubLength;
// final ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length);
//
// for (int i = 0; i < length; i++) {
// superClass = superClasses[i];
// superSubs = subManager.getExactAsArray(superClass);
//
// if (superSubs != null) {
// superSubLength = superSubs.length;
// for (int j = 0; j < superSubLength; j++) {
// sub = superSubs[j];
//
// if (sub.getHandlerAccess().acceptsSubtypes()) {
// subsAsList.add(sub);
// }
// }
// }
// }
//
// final int size = subsAsList.size();
// if (size > 0) {
// Subscription[] subs = new Subscription[size];
// subsAsList.toArray(subs);
// local.put(clazz, subs);
//
// superClassSubscriptions = local;
//
// return subs;
// }
//
// return null;
// }
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
*
* @return CAN RETURN NULL
*/
public
Subscription[] getSuperSubscriptions(final Class<?> clazz) {
// whenever our subscriptions change, this map is cleared.
return this.superClassSubscriptions.get(clazz);
}
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
*
* @return CAN NOT RETURN NULL
*/
public
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final SubscriptionManager subManager) {
// // whenever our subscriptions change, this map is cleared.
// final MapTree<Class<?>, ArrayList<Subscription>> cached = this.superClassSubscriptionsMulti;
//
// ArrayList<Subscription> subs = cached.get(clazz1, clazz2);
//
// if (subs == null) {
// // types was not empty, so collect subscriptions for each type and collate them
//
// // save the subscriptions
// final Class<?>[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response
// final Class<?>[] superClasses2 = this.superClass.getSuperClasses(clazz2); // never returns null, cached response
//
// Class<?> superClass1;
// Class<?> superClass2;
// ArrayList<Subscription> superSubs;
// Subscription sub;
//
// final int length1 = superClasses1.length;
// final int length2 = superClasses2.length;
//
// subs = new ArrayList<Subscription>(length1 + length2);
//
// for (int i = 0; i < length1; i++) {
// superClass1 = superClasses1[i];
//
// // only go over subtypes
// if (superClass1 == clazz1) {
// continue;
// }
//
// for (int j = 0; j < length2; j++) {
// superClass2 = superClasses2[j];
//
// // only go over subtypes
// if (superClass2 == clazz2) {
// continue;
// }
//
// superSubs = subManager.getExactAsArray(superClass1, superClass2);
// if (superSubs != null) {
// for (int k = 0; k < superSubs.size(); k++) {
// sub = superSubs.get(k);
//
// if (sub.getHandlerAccess().acceptsSubtypes()) {
// subs.add(sub);
// }
// }
// }
// }
// }
// subs.trimToSize();
// cached.put(subs, clazz1, clazz2);
// }
// return subs;
return null;
}
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
*
* @return CAN NOT RETURN NULL
*/
public
ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Class<?> clazz3,
final SubscriptionManager subManager) {
// // whenever our subscriptions change, this map is cleared.
// final MapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
//
// ArrayList<Subscription> subs = local.get(clazz1, clazz2, clazz3);
//
// if (subs == null) {
// // types was not empty, so collect subscriptions for each type and collate them
//
// // save the subscriptions
// final Class<?>[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response
// final Class<?>[] superClasses2 = this.superClass.getSuperClasses(clazz2); // never returns null, cached response
// final Class<?>[] superClasses3 = this.superClass.getSuperClasses(clazz3); // never returns null, cached response
//
// Class<?> superClass1;
// Class<?> superClass2;
// Class<?> superClass3;
// ArrayList<Subscription> superSubs;
// Subscription sub;
//
// final int length1 = superClasses1.length;
// final int length2 = superClasses2.length;
// final int length3 = superClasses3.length;
//
// subs = new ArrayList<Subscription>(length1 + length2 + length3);
//
// for (int i = 0; i < length1; i++) {
// superClass1 = superClasses1[i];
//
// // only go over subtypes
// if (superClass1 == clazz1) {
// continue;
// }
//
// for (int j = 0; j < length2; j++) {
// superClass2 = superClasses2[j];
//
// // only go over subtypes
// if (superClass2 == clazz2) {
// continue;
// }
//
// for (int k = 0; k < length3; k++) {
// superClass3 = superClasses3[j];
//
// // only go over subtypes
// if (superClass3 == clazz3) {
// continue;
// }
//
// superSubs = subManager.getExactAsArray(superClass1, superClass2, superClass3);
// if (superSubs != null) {
// for (int m = 0; m < superSubs.size(); m++) {
// sub = superSubs.get(m);
//
// if (sub.getHandlerAccess().acceptsSubtypes()) {
// subs.add(sub);
// }
// }
// }
// }
// }
// }
// subs.trimToSize();
// local.put(subs, clazz1, clazz2, clazz3);
// }
//
// return subs;
return null;
}
}

View File

@ -220,7 +220,7 @@ public class SubscriptionManagerTest extends AssertSupport {
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
@ -247,7 +247,7 @@ public class SubscriptionManagerTest extends AssertSupport {
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, errorHandler);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);