Moved ConcurrentSet entries/etc into subscription -> BEST performance for subscription iteration w/o GC
This commit is contained in:
parent
618e4034f2
commit
b2cbd9c084
|
@ -39,12 +39,12 @@ package dorkbox.util.messagebus.subscription;
|
|||
|
||||
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||
import com.esotericsoftware.reflectasm.MethodAccess;
|
||||
import dorkbox.util.messagebus.common.Entry;
|
||||
import dorkbox.util.messagebus.common.MessageHandler;
|
||||
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
|
||||
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
|
||||
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
|
@ -65,8 +65,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
|||
*/
|
||||
public final
|
||||
class Subscription {
|
||||
private static final int GROW_SIZE = 8;
|
||||
|
||||
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
|
||||
public final int ID = ID_COUNTER.getAndIncrement();
|
||||
|
||||
|
@ -78,17 +76,18 @@ class Subscription {
|
|||
private final MessageHandler handler;
|
||||
private final IHandlerInvocation invocation;
|
||||
|
||||
// NOTE: this is still inside the single-writer! can use the same techniques as subscription manager (for thread safe publication)
|
||||
private int firstFreeSpot = 0; // only touched by a single thread
|
||||
private volatile Object[] listeners = new Object[GROW_SIZE]; // only modified by a single thread
|
||||
|
||||
private final IdentityMap<Object, Integer> listenerMap = new IdentityMap<>(GROW_SIZE);
|
||||
|
||||
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
|
||||
private static final AtomicReferenceFieldUpdater<Subscription, Object[]> listenersREF =
|
||||
private static final AtomicReferenceFieldUpdater<Subscription, Entry> headREF =
|
||||
AtomicReferenceFieldUpdater.newUpdater(Subscription.class,
|
||||
Object[].class,
|
||||
"listeners");
|
||||
Entry.class,
|
||||
"head");
|
||||
|
||||
|
||||
// This is only touched by a single thread!
|
||||
private final IdentityMap<Object, Entry> entries; // maintain a map of entries for FAST lookup during unsubscribe.
|
||||
|
||||
// this is still inside the single-writer, and can use the same techniques as subscription manager (for thread safe publication)
|
||||
public volatile Entry head; // reference to the first element
|
||||
|
||||
|
||||
public
|
||||
|
@ -102,6 +101,21 @@ class Subscription {
|
|||
}
|
||||
|
||||
this.invocation = invocation;
|
||||
|
||||
entries = new IdentityMap<>(32, SubscriptionManager.LOAD_FACTOR);
|
||||
|
||||
|
||||
if (handler.acceptsSubtypes()) {
|
||||
// keep a list of "super-class" messages that access this. This is updated by multiple threads. This is so we know WHAT
|
||||
// super-subscriptions to clear when we sub/unsub
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// called on shutdown for GC purposes
|
||||
public void clear() {
|
||||
this.entries.clear();
|
||||
this.head.clear();
|
||||
}
|
||||
|
||||
// only used in unit tests to verify that the subscription manager is working correctly
|
||||
|
@ -118,31 +132,14 @@ class Subscription {
|
|||
public
|
||||
void subscribe(final Object listener) {
|
||||
// single writer principle!
|
||||
Entry head = headREF.get(this);
|
||||
|
||||
Object[] localListeners = listenersREF.get(this);
|
||||
if (!entries.containsKey(listener)) {
|
||||
head = new Entry(listener, head);
|
||||
|
||||
final int length = localListeners.length;
|
||||
int spotToPlace = firstFreeSpot;
|
||||
|
||||
while (true) {
|
||||
if (spotToPlace >= length) {
|
||||
// if we couldn't find a place to put the listener, grow the array, but it is never shrunk
|
||||
localListeners = Arrays.copyOf(localListeners, length + GROW_SIZE, Object[].class);
|
||||
break;
|
||||
entries.put(listener, head);
|
||||
headREF.lazySet(this, head);
|
||||
}
|
||||
|
||||
if (localListeners[spotToPlace] == null) {
|
||||
break;
|
||||
}
|
||||
spotToPlace++;
|
||||
}
|
||||
|
||||
listenerMap.put(listener, spotToPlace);
|
||||
localListeners[spotToPlace] = listener;
|
||||
|
||||
// mark this spot as taken, so the next subscribe starts out a little ahead
|
||||
firstFreeSpot = spotToPlace + 1;
|
||||
listenersREF.lazySet(this, localListeners);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,31 +147,28 @@ class Subscription {
|
|||
*/
|
||||
public
|
||||
boolean unsubscribe(final Object listener) {
|
||||
// single writer principle!
|
||||
|
||||
final Integer integer = listenerMap.remove(listener);
|
||||
Object[] localListeners = listenersREF.get(this);
|
||||
|
||||
if (integer != null) {
|
||||
final int index = integer;
|
||||
firstFreeSpot = index;
|
||||
localListeners[index] = null;
|
||||
listenersREF.lazySet(this, localListeners);
|
||||
return true;
|
||||
Entry entry = entries.get(listener);
|
||||
if (entry == null || entry.getValue() == null) {
|
||||
// fast exit
|
||||
return false;
|
||||
}
|
||||
else {
|
||||
for (int i = 0; i < localListeners.length; i++) {
|
||||
if (localListeners[i] == listener) {
|
||||
firstFreeSpot = i;
|
||||
localListeners[i] = null;
|
||||
listenersREF.lazySet(this, localListeners);
|
||||
return true;
|
||||
}
|
||||
// single writer principle!
|
||||
Entry head = headREF.get(this);
|
||||
|
||||
if (entry != head) {
|
||||
entry.remove();
|
||||
}
|
||||
else {
|
||||
// if it was second, now it's first
|
||||
head = head.next();
|
||||
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
|
||||
}
|
||||
|
||||
firstFreeSpot = 0;
|
||||
return false;
|
||||
this.entries.remove(listener);
|
||||
headREF.lazySet(this, head);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -182,16 +176,7 @@ class Subscription {
|
|||
*/
|
||||
public
|
||||
int size() {
|
||||
// since this is ONLY used in unit tests, we count how many are non-null
|
||||
|
||||
int count = 0;
|
||||
for (int i = 0; i < listeners.length; i++) {
|
||||
if (listeners[i] != null) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
return this.entries.size;
|
||||
}
|
||||
|
||||
public
|
||||
|
@ -200,62 +185,46 @@ class Subscription {
|
|||
final int handleIndex = this.handler.getMethodIndex();
|
||||
final IHandlerInvocation invocation = this.invocation;
|
||||
|
||||
Entry current = headREF.get(this);
|
||||
Object listener;
|
||||
final Object[] localListeners = listenersREF.get(this);
|
||||
for (int i = 0; i < localListeners.length; i++) {
|
||||
listener = localListeners[i];
|
||||
if (listener != null) {
|
||||
while (current != null) {
|
||||
listener = current.getValue();
|
||||
current = current.next();
|
||||
|
||||
invocation.invoke(listener, handler, handleIndex, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public
|
||||
void publish(final Object message1, final Object message2) throws Throwable {
|
||||
// final MethodAccess handler = this.handler.getHandler();
|
||||
// final int handleIndex = this.handler.getMethodIndex();
|
||||
// final IHandlerInvocation invocation = this.invocation;
|
||||
//
|
||||
// Iterator<Object> iterator;
|
||||
// Object listener;
|
||||
//
|
||||
// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
|
||||
// listener = iterator.next();
|
||||
//
|
||||
// invocation.invoke(listener, handler, handleIndex, message1, message2);
|
||||
// }
|
||||
final MethodAccess handler = this.handler.getHandler();
|
||||
final int handleIndex = this.handler.getMethodIndex();
|
||||
final IHandlerInvocation invocation = this.invocation;
|
||||
|
||||
Entry current = headREF.get(this);
|
||||
Object listener;
|
||||
while (current != null) {
|
||||
listener = current.getValue();
|
||||
current = current.next();
|
||||
|
||||
invocation.invoke(listener, handler, handleIndex, message1, message2);
|
||||
}
|
||||
}
|
||||
|
||||
public
|
||||
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
|
||||
// final MethodAccess handler = this.handler.getHandler();
|
||||
// final int handleIndex = this.handler.getMethodIndex();
|
||||
// final IHandlerInvocation invocation = this.invocation;
|
||||
//
|
||||
// Iterator<Object> iterator;
|
||||
// Object listener;
|
||||
//
|
||||
// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
|
||||
// listener = iterator.next();
|
||||
//
|
||||
// invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
|
||||
// }
|
||||
}
|
||||
final MethodAccess handler = this.handler.getHandler();
|
||||
final int handleIndex = this.handler.getMethodIndex();
|
||||
final IHandlerInvocation invocation = this.invocation;
|
||||
|
||||
public
|
||||
void publish(final Object... messages) throws Throwable {
|
||||
// final MethodAccess handler = this.handler.getHandler();
|
||||
// final int handleIndex = this.handler.getMethodIndex();
|
||||
// final IHandlerInvocation invocation = this.invocation;
|
||||
//
|
||||
// Iterator<Object> iterator;
|
||||
// Object listener;
|
||||
//
|
||||
// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
|
||||
// listener = iterator.next();
|
||||
//
|
||||
// invocation.invoke(listener, handler, handleIndex, messages);
|
||||
// }
|
||||
Entry current = headREF.get(this);
|
||||
Object listener;
|
||||
while (current != null) {
|
||||
listener = current.getValue();
|
||||
current = current.next();
|
||||
|
||||
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -16,18 +16,16 @@
|
|||
package dorkbox.util.messagebus.subscription;
|
||||
|
||||
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||
import dorkbox.util.messagebus.common.HashMapTree;
|
||||
import dorkbox.util.messagebus.common.ClassTree;
|
||||
import dorkbox.util.messagebus.common.MessageHandler;
|
||||
import dorkbox.util.messagebus.common.MultiClass;
|
||||
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
|
||||
import dorkbox.util.messagebus.utils.ClassUtils;
|
||||
import dorkbox.util.messagebus.utils.ReflectionUtils;
|
||||
import dorkbox.util.messagebus.utils.SubscriptionUtils;
|
||||
import dorkbox.util.messagebus.utils.VarArgUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
/**
|
||||
|
@ -42,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
|||
* @author dorkbox, llc
|
||||
* Date: 2/2/15
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final
|
||||
class SubscriptionManager {
|
||||
public static final float LOAD_FACTOR = 0.8F;
|
||||
|
@ -65,6 +64,7 @@ class SubscriptionManager {
|
|||
|
||||
// all subscriptions of a message type.
|
||||
private volatile IdentityMap<Class<?>, Subscription[]> subsSingle;
|
||||
private volatile IdentityMap<MultiClass, Subscription[]> subsMulti;
|
||||
|
||||
// keeps track of all subscriptions of the super classes of a message type.
|
||||
private volatile IdentityMap<Class<?>, Subscription[]> subsSuperSingle;
|
||||
|
@ -77,7 +77,7 @@ class SubscriptionManager {
|
|||
|
||||
// In order to force the "Single writer principle" on subscribe & unsubscribe, they are within WRITE LOCKS. They could be dispatched
|
||||
// to another thread, however we do NOT want them asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is
|
||||
// called before.
|
||||
// called before. A WriteLock doesn't perform any better here than synchronized does.
|
||||
private final Object singleWriterLock = new Object();
|
||||
|
||||
|
||||
|
@ -88,7 +88,7 @@ class SubscriptionManager {
|
|||
private final SubscriptionUtils subUtils;
|
||||
private final VarArgUtils varArgUtils;
|
||||
|
||||
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
|
||||
private final ClassTree<Class<?>> classTree;
|
||||
|
||||
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
|
||||
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
|
||||
|
@ -102,6 +102,12 @@ class SubscriptionManager {
|
|||
IdentityMap.class,
|
||||
"subsSingle");
|
||||
|
||||
private static final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsMultiREF =
|
||||
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
||||
IdentityMap.class,
|
||||
"subsMulti");
|
||||
|
||||
|
||||
private static final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsSuperSingleREF =
|
||||
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
||||
IdentityMap.class,
|
||||
|
@ -131,14 +137,16 @@ class SubscriptionManager {
|
|||
nonListeners = new IdentityMap<Class<?>, Boolean>(16, LOAD_FACTOR);
|
||||
subsPerListener = new IdentityMap<>(32, LOAD_FACTOR);
|
||||
subsSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||
subsMulti = new IdentityMap<MultiClass, Subscription[]>(32, LOAD_FACTOR);
|
||||
|
||||
|
||||
subsSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||
subsVaritySingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||
subsSuperVaritySingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||
|
||||
|
||||
|
||||
|
||||
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
|
||||
this.classTree = new ClassTree<Class<?>>();
|
||||
|
||||
this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR, numberOfThreads);
|
||||
|
||||
|
@ -151,6 +159,22 @@ class SubscriptionManager {
|
|||
|
||||
public
|
||||
void shutdown() {
|
||||
|
||||
// explicitly clear out the subscriptions
|
||||
final IdentityMap.Entries<Class<?>, Subscription[]> entries = subsPerListener.entries();
|
||||
for (IdentityMap.Entry<Class<?>, Subscription[]> entry : entries) {
|
||||
final Subscription[] subscriptions = entry.value;
|
||||
if (subscriptions != null) {
|
||||
Subscription subscription;
|
||||
|
||||
for (int i = 0; i < subscriptions.length; i++) {
|
||||
subscription = subscriptions[i];
|
||||
subscription.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
this.nonListeners.clear();
|
||||
|
||||
this.subsPerListener.clear();
|
||||
|
@ -160,8 +184,7 @@ class SubscriptionManager {
|
|||
this.subsVaritySingle.clear();
|
||||
this.subsSuperVaritySingle.clear();
|
||||
|
||||
|
||||
this.subscriptionsPerMessageMulti.clear();
|
||||
this.classTree.clear();
|
||||
|
||||
this.classUtils.shutdown();
|
||||
}
|
||||
|
@ -174,38 +197,6 @@ class SubscriptionManager {
|
|||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||
// use-case 99% of the time)
|
||||
synchronized (singleWriterLock) {
|
||||
|
||||
// when subscribing, this is a GREAT opportunity to figure out the classes/objects loaded -- their hierarchy, AND generate UUIDs
|
||||
// for each CLASS that can be accessed. This then lets us lookup a UUID for each object that comes in -- if an ID is found (for
|
||||
// any part of it's object hierarchy) -- it means that we have that listeners for that object. this is MUCH faster checking if
|
||||
// we have subscriptions first (and failing).
|
||||
//
|
||||
// so during subscribe we can check "getUUID for all parameter.class accessed by this listener" -> then during publish "lookup
|
||||
// UUID of incoming message.class" (+ it's super classes, if necessary) -> then check if UUID exists. If yes, then we know there
|
||||
// are subs. if no - then it's a dead message.
|
||||
//
|
||||
// This lets us accomplish TWO things
|
||||
// 1) be able quickly determine if there are dead messages
|
||||
// 2) be able to create "multi-class" UUIDs, when two+ classes are represented (always) by the same UUID, by a clever mixing of
|
||||
// the classes individual UUIDs.
|
||||
//
|
||||
// The generation of UUIDs happens ONLY during subscribe, and during publish they are looked up. This UUID can be a simple
|
||||
// AtomicInteger that starts a MIN_VALUE and count's up.
|
||||
|
||||
|
||||
// note: we can do PRE-STARTUP instrumentation (ie, BEFORE any classes are loaded by the classloader) and inject the UUID into
|
||||
// every object (as a public static final field), then use reflection to look up this value. It would go something like this:
|
||||
// 1) scan every class for annotations that match
|
||||
// 2) for each method that has our annotation -- get the list of classes + hierarchy that are the parameters for the method
|
||||
// 3) inject the UUID field into each class object that #2 returns, only if it doesn't already exist. use invalid field names
|
||||
// (ie: start with numbers or ? or ^ or something
|
||||
//
|
||||
// then during SUB/UNSUB/PUB, we use this UUID for everything (and we can have multi-UUID lookups for the 'multi-arg' thing).
|
||||
// If there is no UUID, then we just abort the SUB/UNSUB or send a deadmessage
|
||||
|
||||
|
||||
|
||||
|
||||
final IdentityMap<Class<?>, Boolean> nonListeners = this.nonListeners;
|
||||
if (nonListeners.containsKey(listenerClass)) {
|
||||
// early reject of known classes that do not define message handlers
|
||||
|
@ -231,7 +222,9 @@ class SubscriptionManager {
|
|||
subscriptions = new Subscription[handlersSize];
|
||||
|
||||
// access a snapshot of the subscriptions (single-writer-principle)
|
||||
final IdentityMap<Class<?>, Subscription[]> localSubs = subsSingleREF.get(this);
|
||||
final IdentityMap<Class<?>, Subscription[]> singleSubs = subsSingleREF.get(this);
|
||||
// final IdentityMap<MultiClass, Subscription[]> multiSubs = subsMultiREF.get(this);
|
||||
|
||||
// final IdentityMap<Class<?>, Subscription[]> localSuperSubs = subsSuperSingleREF.get(this);
|
||||
// final IdentityMap<Class<?>, Subscription[]> localVaritySubs = subsVaritySingleREF.get(this);
|
||||
// final IdentityMap<Class<?>, Subscription[]> localSuperVaritySubs = subsSuperVaritySingleREF.get(this);
|
||||
|
@ -242,24 +235,73 @@ class SubscriptionManager {
|
|||
Class<?>[] messageHandlerTypes;
|
||||
Class<?> handlerType;
|
||||
|
||||
|
||||
// Prepare all of the subscriptions
|
||||
for (int i = 0; i < handlersSize; i++) {
|
||||
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
|
||||
messageHandler = messageHandlers[i];
|
||||
|
||||
// is this handler able to accept var args?
|
||||
if (messageHandler.getVarArgClass() != null) {
|
||||
varArgPossibility.lazySet(true);
|
||||
}
|
||||
// if (messageHandler.getVarArgClass() != null) {
|
||||
// varArgPossibility.lazySet(true);
|
||||
// }
|
||||
|
||||
// now create a list of subscriptions for this specific handlerType (but don't add anything yet).
|
||||
// we only store things based on the FIRST type (for lookup) then parse the rest of the types during publication
|
||||
messageHandlerTypes = messageHandler.getHandledMessages();
|
||||
// final int handlerSize = messageHandlerTypes.length;
|
||||
// switch (handlerSize) {
|
||||
// case 0: {
|
||||
// // if a publisher publishes VOID, it calls a method with 0 parameters (that's been subscribed)
|
||||
// // This is the SAME THING as having Void as a parameter!!
|
||||
// handlerType = Void.class;
|
||||
//
|
||||
// if (!singleSubs.containsKey(handlerType)) {
|
||||
// // this is copied to a larger array if necessary, but needs to be SOMETHING before subsPerListener is added
|
||||
// singleSubs.put(handlerType, SUBSCRIPTIONS);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case 1: {
|
||||
handlerType = messageHandlerTypes[0];
|
||||
|
||||
if (!localSubs.containsKey(handlerType)) {
|
||||
if (!singleSubs.containsKey(handlerType)) {
|
||||
// this is copied to a larger array if necessary, but needs to be SOMETHING before subsPerListener is added
|
||||
localSubs.put(handlerType, SUBSCRIPTIONS);
|
||||
singleSubs.put(handlerType, SUBSCRIPTIONS);
|
||||
}
|
||||
// break;
|
||||
// }
|
||||
// case 2: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes[0],
|
||||
// messageHandlerTypes[1]);
|
||||
//
|
||||
// if (!multiSubs.containsKey(multiClass)) {
|
||||
// // this is copied to a larger array if necessary, but needs to be SOMETHING before subsPerListener is added
|
||||
// multiSubs.put(multiClass, SUBSCRIPTIONS);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case 3: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes[0],
|
||||
// messageHandlerTypes[1],
|
||||
// messageHandlerTypes[2]);
|
||||
//
|
||||
// if (!multiSubs.containsKey(multiClass)) {
|
||||
// // this is copied to a larger array if necessary, but needs to be SOMETHING before subsPerListener is added
|
||||
// multiSubs.put(multiClass, SUBSCRIPTIONS);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// default: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes);
|
||||
//
|
||||
// if (!multiSubs.containsKey(multiClass)) {
|
||||
// // this is copied to a larger array if necessary, but needs to be SOMETHING before subsPerListener is added
|
||||
// multiSubs.put(multiClass, SUBSCRIPTIONS);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
// create the subscription. This can be thrown away if the subscription succeeds in another thread
|
||||
subscription = new Subscription(listenerClass, messageHandler);
|
||||
|
@ -272,7 +314,7 @@ class SubscriptionManager {
|
|||
subsPerListener.put(listenerClass, subscriptions);
|
||||
|
||||
|
||||
// we can now safely add for publication AND subscribe since the data structures are consistent
|
||||
// add for publication AND subscribe since the data structures are consistent
|
||||
for (int i = 0; i < handlersSize; i++) {
|
||||
subscription = subscriptions[i];
|
||||
subscription.subscribe(listener); // register this callback listener to this subscription
|
||||
|
@ -282,28 +324,100 @@ class SubscriptionManager {
|
|||
|
||||
// register for publication
|
||||
messageHandlerTypes = messageHandler.getHandledMessages();
|
||||
// final int handlerSize = messageHandlerTypes.length;
|
||||
//
|
||||
// switch (handlerSize) {
|
||||
// case 0: {
|
||||
// handlerType = Void.class;
|
||||
//
|
||||
// // makes this subscription visible for publication
|
||||
// final Subscription[] currentSubs = singleSubs.get(handlerType);
|
||||
// final int currentLength = currentSubs.length;
|
||||
//
|
||||
// // add the new subscription to the array
|
||||
// final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
|
||||
// newSubs[currentLength] = subscription;
|
||||
// singleSubs.put(handlerType, newSubs);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case 1: {
|
||||
handlerType = messageHandlerTypes[0];
|
||||
|
||||
|
||||
// makes this subscription visible for publication
|
||||
final Subscription[] currentSubs = localSubs.get(handlerType);
|
||||
final Subscription[] currentSubs = singleSubs.get(handlerType);
|
||||
final int currentLength = currentSubs.length;
|
||||
|
||||
// add the new subscription to the array
|
||||
final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
|
||||
newSubs[currentLength] = subscription;
|
||||
localSubs.put(handlerType, newSubs);
|
||||
singleSubs.put(handlerType, newSubs);
|
||||
|
||||
|
||||
// update the varity/super types
|
||||
// registerExtraSubs(handlerType, localSubs, localSuperSubs, localVaritySubs);
|
||||
// registerExtraSubs(handlerType, singleSubs, localSuperSubs, localVaritySubs);
|
||||
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case 2: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes[0],
|
||||
// messageHandlerTypes[1]);
|
||||
// // makes this subscription visible for publication
|
||||
// final Subscription[] currentSubs = multiSubs.get(multiClass);
|
||||
// final int currentLength = currentSubs.length;
|
||||
//
|
||||
// // add the new subscription to the array
|
||||
// final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
|
||||
// newSubs[currentLength] = subscription;
|
||||
// multiSubs.put(multiClass, newSubs);
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case 3: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes[0],
|
||||
// messageHandlerTypes[1],
|
||||
// messageHandlerTypes[2]);
|
||||
// // makes this subscription visible for publication
|
||||
// final Subscription[] currentSubs = multiSubs.get(multiClass);
|
||||
// final int currentLength = currentSubs.length;
|
||||
//
|
||||
// // add the new subscription to the array
|
||||
// final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
|
||||
// newSubs[currentLength] = subscription;
|
||||
// multiSubs.put(multiClass, newSubs);
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// default: {
|
||||
// final MultiClass multiClass = classTree.get(messageHandlerTypes);
|
||||
// // makes this subscription visible for publication
|
||||
// final Subscription[] currentSubs = multiSubs.get(multiClass);
|
||||
// final int currentLength = currentSubs.length;
|
||||
//
|
||||
// // add the new subscription to the array
|
||||
// final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
|
||||
// newSubs[currentLength] = subscription;
|
||||
// multiSubs.put(multiClass, newSubs);
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
|
||||
// save this snapshot back to the original (single writer principle)
|
||||
subsSingleREF.lazySet(this, localSubs);
|
||||
subsSingleREF.lazySet(this, singleSubs);
|
||||
// subsMultiREF.lazySet(this, multiSubs);
|
||||
// subsSuperSingleREF.lazySet(this, localSuperSubs);
|
||||
// subsVaritySingleREF.lazySet(this, localVaritySubs);
|
||||
// subsSuperVaritySingleREF.lazySet(this, localSuperVaritySubs);
|
||||
|
||||
|
||||
// only dump the super subscritions if it is a COMPLETELY NEW subscription. If it's not new, then the heirarchy isn't
|
||||
// changing for super subscriptions
|
||||
subsSuperSingleREF.lazySet(this, new IdentityMap(32));
|
||||
}
|
||||
else {
|
||||
// subscriptions already exist and must only be updated
|
||||
|
@ -425,85 +539,44 @@ class SubscriptionManager {
|
|||
return varArgUtils;
|
||||
}
|
||||
|
||||
// inside a write lock
|
||||
// add this subscription to each of the handled types
|
||||
// to activate this sub for publication
|
||||
private
|
||||
void registerMulti(final Subscription subscription, final Class<?> listenerClass,
|
||||
final Map<Class<?>, List<Subscription>> subsPerMessageSingle,
|
||||
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final AtomicBoolean varArgPossibility) {
|
||||
|
||||
final MessageHandler handler = subscription.getHandler();
|
||||
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
|
||||
final int size = messageHandlerTypes.length;
|
||||
|
||||
final Class<?> type0 = messageHandlerTypes[0];
|
||||
|
||||
switch (size) {
|
||||
case 0: {
|
||||
// TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method?
|
||||
errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass);
|
||||
return;
|
||||
}
|
||||
case 1: {
|
||||
// // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
|
||||
// final List<Subscription> cachedSubs = listCache.get();
|
||||
// List<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs);
|
||||
// if (subs == null) {
|
||||
// listCache.set(new CopyOnWriteArrayList<Subscription>());
|
||||
//// listCache.set(new ArrayList<Subscription>(8));
|
||||
// subs = cachedSubs;
|
||||
//
|
||||
// // is this handler able to accept var args?
|
||||
// if (handler.getVarArgClass() != null) {
|
||||
// varArgPossibility.lazySet(true);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// subs.add(subscription);
|
||||
return;
|
||||
}
|
||||
case 2: {
|
||||
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
|
||||
if (subs == null) {
|
||||
subs = new ArrayList<Subscription>();
|
||||
|
||||
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
|
||||
}
|
||||
|
||||
subs.add(subscription);
|
||||
return;
|
||||
}
|
||||
case 3: {
|
||||
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
|
||||
if (subs == null) {
|
||||
subs = new ArrayList<Subscription>();
|
||||
|
||||
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
|
||||
}
|
||||
|
||||
subs.add(subscription);
|
||||
return;
|
||||
}
|
||||
default: {
|
||||
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
|
||||
if (subs == null) {
|
||||
subs = new ArrayList<Subscription>();
|
||||
|
||||
subsPerMessageMulti.put(subs, messageHandlerTypes);
|
||||
}
|
||||
|
||||
subs.add(subscription);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?> messageClass) {
|
||||
return (Subscription[]) subsSingleREF.get(this).get(messageClass);
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||
// never returns null
|
||||
final MultiClass multiClass = classTree.get(messageClass1,
|
||||
messageClass2);
|
||||
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
|
||||
// never returns null
|
||||
final MultiClass multiClass = classTree.get(messageClass1,
|
||||
messageClass2,
|
||||
messageClass3);
|
||||
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?>[] messageClasses) {
|
||||
// never returns null
|
||||
final MultiClass multiClass = classTree.get(messageClasses);
|
||||
|
||||
return (Subscription[]) subsMultiREF.get(this).get(multiClass);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSuperSubs(final Class<?> messageClass) {
|
||||
|
@ -514,8 +587,8 @@ class SubscriptionManager {
|
|||
|
||||
final IdentityMap<Class<?>, Subscription[]> localSubs = subsSingleREF.get(this);
|
||||
|
||||
Subscription sub;
|
||||
Class<?> superClass;
|
||||
Subscription sub;
|
||||
Subscription[] superSubs;
|
||||
boolean hasSubs = false;
|
||||
|
||||
|
@ -543,184 +616,144 @@ class SubscriptionManager {
|
|||
return subsAsList.toArray(new Subscription[0]);
|
||||
}
|
||||
else {
|
||||
// TODO: shortcut out if there are no handlers that accept subtypes
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// IT IS CRITICAL TO REMEMBER: The subscriptions that are remembered here DO NOT CHANGE (only the listeners inside then change)
|
||||
// IT IS CRITICAL TO REMEMBER: The subscriptions that are remembered here DO NOT CHANGE (only the listeners inside them change). if we
|
||||
// subscribe a super/child class -- THEN these subscriptions change!
|
||||
|
||||
// return (Subscription[]) subsSuperSingleREF.get(this).get(messageClass);
|
||||
}
|
||||
|
||||
|
||||
public
|
||||
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
|
||||
}
|
||||
|
||||
public
|
||||
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
|
||||
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
|
||||
}
|
||||
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
|
||||
Subscription[] getSuperSubs(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||
// save the subscriptions
|
||||
final Class<?>[] superClasses1 = this.classUtils.getSuperClasses(messageClass1); // never returns null, cached response
|
||||
final Class<?>[] superClasses2 = this.classUtils.getSuperClasses(messageClass2); // never returns null, cached response
|
||||
|
||||
if (collection != null) {
|
||||
return collection.toArray(new Subscription[0]);
|
||||
final IdentityMap<MultiClass, Subscription[]> localSubs = subsMultiREF.get(this);
|
||||
|
||||
Class<?> superClass1;
|
||||
Class<?> superClass2;
|
||||
Subscription sub;
|
||||
Subscription[] superSubs;
|
||||
boolean hasSubs = false;
|
||||
|
||||
|
||||
final int length1 = superClasses1.length;
|
||||
final int length2 = superClasses2.length;
|
||||
|
||||
ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length1 + length2);
|
||||
|
||||
for (int i = 0; i < length1; i++) {
|
||||
superClass1 = superClasses1[i];
|
||||
|
||||
// only go over subtypes
|
||||
if (superClass1 == messageClass1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int j = 0; j < length2; j++) {
|
||||
superClass2 = superClasses2[j];
|
||||
|
||||
// only go over subtypes
|
||||
if (superClass2 == messageClass2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// never returns null
|
||||
final MultiClass multiClass = classTree.get(superClass1,
|
||||
superClass2);
|
||||
superSubs = localSubs.get(multiClass);
|
||||
if (superSubs != null) {
|
||||
for (int k = 0; k < superSubs.length; k++) {
|
||||
sub = superSubs[k];
|
||||
|
||||
if (sub.getHandler().acceptsSubtypes()) {
|
||||
subsAsList.add(sub);
|
||||
hasSubs = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// subsAsList now contains ALL of the super-class subscriptions.
|
||||
if (hasSubs) {
|
||||
return subsAsList.toArray(new Subscription[0]);
|
||||
}
|
||||
else {
|
||||
// TODO: shortcut out if there are no handlers that accept subtypes
|
||||
return null;
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getSubs(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
|
||||
|
||||
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
|
||||
|
||||
if (collection != null) {
|
||||
return collection.toArray(new Subscription[0]);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getExactAndSuper_MUST_HAVE_SEPERATE_CALLS(final Class<?> messageClass) {
|
||||
Subscription[] collection = getSubs(messageClass); // can return null
|
||||
|
||||
// now superClasses
|
||||
Class[] types = ReflectionUtils.getSuperTypes(messageClass);
|
||||
|
||||
// NOTE: have to recalculate SUPER classes for the specified class, because WE DO NOT know this class during subscription time!
|
||||
|
||||
|
||||
// whenever our subscriptions change, this map is cleared.
|
||||
// final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
|
||||
|
||||
// ArrayList<Subscription> subs = local.get(clazz);
|
||||
//
|
||||
// if (subs == null) {
|
||||
// // types was not empty, so collect subscriptions for each type and collate them
|
||||
//
|
||||
// // save the subscriptions
|
||||
// final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
|
||||
//
|
||||
// Class<?> superClass;
|
||||
// ArrayList<Subscription> superSubs;
|
||||
// Subscription sub;
|
||||
//
|
||||
// final int length = superClasses.length;
|
||||
// int superSubLength;
|
||||
// subs = new ArrayList<Subscription>(length);
|
||||
//
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// superClass = superClasses[i];
|
||||
// superSubs = subscriber.getExactAsArray(superClass);
|
||||
//
|
||||
// if (superSubs != null) {
|
||||
// superSubLength = superSubs.size();
|
||||
// for (int j = 0; j < superSubLength; j++) {
|
||||
// sub = superSubs.get(j);
|
||||
//
|
||||
// if (sub.getHandler().acceptsSubtypes()) {
|
||||
// subs.add(sub);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// subs.trimToSize();
|
||||
// local.put(clazz, subs);
|
||||
// }
|
||||
//
|
||||
// return subs;
|
||||
|
||||
|
||||
|
||||
|
||||
//
|
||||
// final Subscription[] superSubscriptions = getSuperSubs(messageClass); // can return null
|
||||
//
|
||||
// if (collection != null) {
|
||||
// if (superSubscriptions != null) {
|
||||
// // but both into a single array
|
||||
// final int length = collection.length;
|
||||
// final int lengthSuper = superSubscriptions.length;
|
||||
//
|
||||
// final Subscription[] newSubs = new Subscription[length + lengthSuper];
|
||||
// System.arraycopy(collection, 0, newSubs, 0, length);
|
||||
// System.arraycopy(superSubscriptions, 0, newSubs, length, lengthSuper);
|
||||
//
|
||||
// return newSubs;
|
||||
// }
|
||||
//
|
||||
// return collection;
|
||||
// }
|
||||
//
|
||||
// return superSubscriptions;
|
||||
return null;
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
|
||||
|
||||
// now publish superClasses
|
||||
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
|
||||
this); // NOT return null
|
||||
|
||||
if (collection != null) {
|
||||
collection = new ArrayList<Subscription>(collection);
|
||||
|
||||
if (!superSubs.isEmpty()) {
|
||||
collection.addAll(superSubs);
|
||||
}
|
||||
}
|
||||
else if (!superSubs.isEmpty()) {
|
||||
collection = superSubs;
|
||||
}
|
||||
|
||||
if (collection != null) {
|
||||
return collection.toArray(new Subscription[0]);
|
||||
}
|
||||
else {
|
||||
// ArrayList<Subscription> collection = getSubs(messageClass1, messageClass2); // can return null
|
||||
//
|
||||
// // now publish superClasses
|
||||
// final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
|
||||
// this); // NOT return null
|
||||
//
|
||||
// if (collection != null) {
|
||||
// collection = new ArrayList<Subscription>(collection);
|
||||
//
|
||||
// if (!superSubs.isEmpty()) {
|
||||
// collection.addAll(superSubs);
|
||||
// }
|
||||
// }
|
||||
// else if (!superSubs.isEmpty()) {
|
||||
// collection = superSubs;
|
||||
// }
|
||||
//
|
||||
// if (collection != null) {
|
||||
// return collection.toArray(new Subscription[0]);
|
||||
// }
|
||||
// else {
|
||||
return null;
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
// can return null
|
||||
public
|
||||
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
|
||||
|
||||
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
|
||||
|
||||
// now publish superClasses
|
||||
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
|
||||
this); // NOT return null
|
||||
|
||||
if (collection != null) {
|
||||
collection = new ArrayList<Subscription>(collection);
|
||||
|
||||
if (!superSubs.isEmpty()) {
|
||||
collection.addAll(superSubs);
|
||||
}
|
||||
}
|
||||
else if (!superSubs.isEmpty()) {
|
||||
collection = superSubs;
|
||||
//
|
||||
// ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
|
||||
//
|
||||
// // now publish superClasses
|
||||
// final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
|
||||
// this); // NOT return null
|
||||
//
|
||||
// if (collection != null) {
|
||||
// collection = new ArrayList<Subscription>(collection);
|
||||
//
|
||||
// if (!superSubs.isEmpty()) {
|
||||
// collection.addAll(superSubs);
|
||||
// }
|
||||
// }
|
||||
// else if (!superSubs.isEmpty()) {
|
||||
// collection = superSubs;
|
||||
// }
|
||||
//
|
||||
// if (collection != null) {
|
||||
// return collection.toArray(new Subscription[0]);
|
||||
// }
|
||||
// else {
|
||||
return null;
|
||||
// }
|
||||
}
|
||||
|
||||
if (collection != null) {
|
||||
return collection.toArray(new Subscription[0]);
|
||||
}
|
||||
else {
|
||||
|
||||
public
|
||||
Subscription[] getSuperSubs(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user