WIP single writer principle. have subsPerMessage + subsPerSuperMessage working
This commit is contained in:
parent
9e06d16855
commit
3674d6031b
@ -133,8 +133,8 @@ class MessageBus implements IMessageBus {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptionManager.subscribe(listener);
|
// subscriptionManager.subscribe(listener);
|
||||||
// subscriptionWriter.subscribe(listener);
|
subscriptionWriter.subscribe(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -144,8 +144,8 @@ class MessageBus implements IMessageBus {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptionManager.unsubscribe(listener);
|
// subscriptionManager.unsubscribe(listener);
|
||||||
// subscriptionWriter.unsubscribe(listener);
|
subscriptionWriter.unsubscribe(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -40,18 +40,29 @@ class PublisherExactWithSuperTypes implements Publisher {
|
|||||||
void publish(final Synchrony synchrony, final Object message1) {
|
void publish(final Synchrony synchrony, final Object message1) {
|
||||||
try {
|
try {
|
||||||
final SubscriptionManager subManager = this.subManager;
|
final SubscriptionManager subManager = this.subManager;
|
||||||
final Class<?> messageClass = message1.getClass();
|
final Class<?> message1Class = message1.getClass();
|
||||||
|
boolean hasSubs = false;
|
||||||
|
|
||||||
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null
|
|
||||||
|
|
||||||
// Run subscriptions
|
// Run subscriptions
|
||||||
|
final Subscription[] subscriptions = subManager.getExactAsArray(message1Class); // can return null
|
||||||
if (subscriptions != null) {
|
if (subscriptions != null) {
|
||||||
|
hasSubs = true;
|
||||||
synchrony.publish(subscriptions, message1);
|
synchrony.publish(subscriptions, message1);
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
// Dead Event must EXACTLY MATCH (no subclasses)
|
|
||||||
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
|
|
||||||
|
|
||||||
|
// Run superClasses
|
||||||
|
final Subscription[] superSubscriptions = subManager.getSuperExactAsArray(message1Class); // can return null
|
||||||
|
if (superSubscriptions != null) {
|
||||||
|
hasSubs = true;
|
||||||
|
synchrony.publish(superSubscriptions, message1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Run dead message subscriptions
|
||||||
|
if (!hasSubs) {
|
||||||
|
// Dead Event must EXACTLY MATCH (no subclasses)
|
||||||
|
final Subscription[] deadSubscriptions = subManager.getExactAsArray(DeadMessage.class); // can return null
|
||||||
if (deadSubscriptions != null) {
|
if (deadSubscriptions != null) {
|
||||||
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
|
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package dorkbox.util.messagebus.subscription;
|
package dorkbox.util.messagebus.subscription;
|
||||||
|
|
||||||
|
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||||
import dorkbox.util.messagebus.common.HashMapTree;
|
import dorkbox.util.messagebus.common.HashMapTree;
|
||||||
import dorkbox.util.messagebus.common.MessageHandler;
|
import dorkbox.util.messagebus.common.MessageHandler;
|
||||||
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
|
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
|
||||||
@ -25,9 +26,8 @@ import dorkbox.util.messagebus.utils.VarArgUtils;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||||
/**
|
/**
|
||||||
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
|
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
|
||||||
*/
|
*/
|
||||||
@ -43,18 +43,29 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
public final
|
public final
|
||||||
class SubscriptionManager {
|
class SubscriptionManager {
|
||||||
public static final float LOAD_FACTOR = 0.8F;
|
public static final float LOAD_FACTOR = 0.8F;
|
||||||
|
public static final Subscription[] SUBSCRIPTIONS = new Subscription[0];
|
||||||
|
|
||||||
// TODO: during startup, precalculate the number of subscription listeners and x2 to save as subsPerListener expected max size
|
// TODO: during startup, pre-calculate the number of subscription listeners and x2 to save as subsPerListener expected max size
|
||||||
|
|
||||||
|
|
||||||
// ONLY used by SUB/UNSUB
|
// ONLY used by SUB/UNSUB
|
||||||
// remember already processed classes that do not contain any message handlers
|
// remember already processed classes that do not contain any message handlers
|
||||||
private final ConcurrentMap<Class<?>, Boolean> nonListeners;
|
private final IdentityMap<Class<?>, Boolean> nonListeners;
|
||||||
|
|
||||||
|
// ONLY used by SUB/UNSUB
|
||||||
// all subscriptions per messageHandler type
|
// all subscriptions per messageHandler type
|
||||||
// this map provides fast access for subscribing and unsubscribing
|
// this map provides fast access for subscribing and unsubscribing
|
||||||
// once a collection of subscriptions is stored it does not change
|
// once a collection of subscriptions is stored it does not change
|
||||||
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
|
private final IdentityMap<Class<?>, Subscription[]> subsPerListener;
|
||||||
|
|
||||||
|
|
||||||
|
// all subscriptions per message type. We perpetually KEEP the types
|
||||||
|
private volatile IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle;
|
||||||
|
|
||||||
|
// keeps track of all subscriptions of the super classes of a message type.
|
||||||
|
private volatile IdentityMap<Class<?>, Subscription[]> subsPerMessageSuperSingle;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -63,10 +74,6 @@ class SubscriptionManager {
|
|||||||
private final SubscriptionUtils subUtils;
|
private final SubscriptionUtils subUtils;
|
||||||
private final VarArgUtils varArgUtils;
|
private final VarArgUtils varArgUtils;
|
||||||
|
|
||||||
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
|
|
||||||
// this is the primary list for dispatching a specific message
|
|
||||||
// write access is synchronized and happens only when a listener of a specific class is registered the first time
|
|
||||||
final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerMessageSingle;
|
|
||||||
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
|
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
|
||||||
|
|
||||||
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
|
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
|
||||||
@ -74,6 +81,18 @@ class SubscriptionManager {
|
|||||||
|
|
||||||
private final ClassUtils classUtils;
|
private final ClassUtils classUtils;
|
||||||
|
|
||||||
|
|
||||||
|
// Recommended for best performance while adhering to the "single writer principle"
|
||||||
|
private final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsPerMessageSingleREF =
|
||||||
|
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
||||||
|
IdentityMap.class,
|
||||||
|
"subsPerMessageSingle");
|
||||||
|
|
||||||
|
private final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsPerMessageSuperSingleRef =
|
||||||
|
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
|
||||||
|
IdentityMap.class,
|
||||||
|
"subsPerMessageSuperSingle");
|
||||||
|
|
||||||
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
|
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
|
||||||
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
|
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
|
||||||
// correct type too)
|
// correct type too)
|
||||||
@ -85,10 +104,13 @@ class SubscriptionManager {
|
|||||||
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
|
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
|
||||||
|
|
||||||
// modified ONLY during SUB/UNSUB
|
// modified ONLY during SUB/UNSUB
|
||||||
this.nonListeners = new ConcurrentHashMap<Class<?>, Boolean>(4, LOAD_FACTOR, numberOfThreads);
|
nonListeners = new IdentityMap<Class<?>, Boolean>(16, LOAD_FACTOR);
|
||||||
|
subsPerListener = new IdentityMap<>(32, LOAD_FACTOR);
|
||||||
|
subsPerMessageSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||||
|
subsPerMessageSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Subscription[]>(32, LOAD_FACTOR, numberOfThreads);
|
|
||||||
subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Subscription[]>(32, LOAD_FACTOR, numberOfThreads);
|
|
||||||
|
|
||||||
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
|
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
|
||||||
|
|
||||||
@ -97,16 +119,19 @@ class SubscriptionManager {
|
|||||||
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but improves performance of handlers
|
// it's a hit on SUB/UNSUB, but improves performance of handlers
|
||||||
this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR, numberOfThreads);
|
this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR, numberOfThreads);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public
|
public
|
||||||
void shutdown() {
|
void shutdown() {
|
||||||
this.nonListeners.clear();
|
this.nonListeners.clear();
|
||||||
|
|
||||||
this.subscriptionsPerMessageSingle.clear();
|
this.subsPerMessageSingle.clear();
|
||||||
|
this.subsPerMessageSuperSingle.clear();
|
||||||
this.subscriptionsPerMessageMulti.clear();
|
this.subscriptionsPerMessageMulti.clear();
|
||||||
|
|
||||||
this.subscriptionsPerListener.clear();
|
this.subsPerListener.clear();
|
||||||
this.classUtils.shutdown();
|
this.classUtils.shutdown();
|
||||||
clear();
|
clear();
|
||||||
}
|
}
|
||||||
@ -154,13 +179,10 @@ class SubscriptionManager {
|
|||||||
|
|
||||||
// this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist.
|
// this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist.
|
||||||
// ONCE subscriptions are in THIS map, they are considered AVAILABLE.
|
// ONCE subscriptions are in THIS map, they are considered AVAILABLE.
|
||||||
Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
Subscription[] subscriptions = this.subsPerListener.get(listenerClass);
|
||||||
|
|
||||||
// the subscriptions from the map were null, so create them
|
// the subscriptions from the map were null, so create them
|
||||||
if (subscriptions == null) {
|
if (subscriptions == null) {
|
||||||
// it is important to note that this section CAN be repeated.
|
|
||||||
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
|
|
||||||
|
|
||||||
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
|
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
|
||||||
final int handlersSize = messageHandlers.length;
|
final int handlersSize = messageHandlers.length;
|
||||||
|
|
||||||
@ -170,7 +192,11 @@ class SubscriptionManager {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the subscriptions
|
||||||
|
subscriptions = new Subscription[handlersSize];
|
||||||
|
|
||||||
|
final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle = subsPerMessageSingleREF.get(this);
|
||||||
|
// final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle = this.subsPerMessageSingle;
|
||||||
|
|
||||||
Subscription subscription;
|
Subscription subscription;
|
||||||
|
|
||||||
@ -178,10 +204,6 @@ class SubscriptionManager {
|
|||||||
Class<?>[] messageHandlerTypes;
|
Class<?>[] messageHandlerTypes;
|
||||||
Class<?> handlerType;
|
Class<?> handlerType;
|
||||||
|
|
||||||
// create the subscriptions
|
|
||||||
final ConcurrentMap<Class<?>, Subscription[]> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
|
|
||||||
subscriptions = new Subscription[handlersSize];
|
|
||||||
|
|
||||||
for (int i = 0; i < handlersSize; i++) {
|
for (int i = 0; i < handlersSize; i++) {
|
||||||
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
|
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
|
||||||
messageHandler = messageHandlers[i];
|
messageHandler = messageHandlers[i];
|
||||||
@ -197,7 +219,7 @@ class SubscriptionManager {
|
|||||||
handlerType = messageHandlerTypes[0];
|
handlerType = messageHandlerTypes[0];
|
||||||
|
|
||||||
if (!subsPerMessageSingle.containsKey(handlerType)) {
|
if (!subsPerMessageSingle.containsKey(handlerType)) {
|
||||||
subsPerMessageSingle.put(handlerType, new Subscription[0]);
|
subsPerMessageSingle.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -208,50 +230,47 @@ class SubscriptionManager {
|
|||||||
|
|
||||||
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
|
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
|
||||||
|
|
||||||
final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub
|
|
||||||
if (previousSubs != null) {
|
|
||||||
// another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure
|
|
||||||
// all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUCT the correct list again.
|
|
||||||
// This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle.
|
|
||||||
|
|
||||||
// since nothing is yet "subscribed" we can assign the correct values for everything now
|
subsPerListener.put(listenerClass, subscriptions); // activates this sub for sub/unsub
|
||||||
subscriptions = previousSubs;
|
|
||||||
} else {
|
|
||||||
// we can now safely add for publication AND subscribe since the data structures are consistent
|
|
||||||
for (int i = 0; i < handlersSize; i++) {
|
|
||||||
// register the super types/varity types
|
|
||||||
subUtils.register(listenerClass, this);
|
|
||||||
|
|
||||||
subscription = subscriptions[i];
|
// we can now safely add for publication AND subscribe since the data structures are consistent
|
||||||
subscription.subscribe(listener); // register this callback listener to this subscription
|
for (int i = 0; i < handlersSize; i++) {
|
||||||
|
subscription = subscriptions[i];
|
||||||
|
subscription.subscribe(listener); // register this callback listener to this subscription
|
||||||
|
|
||||||
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
|
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
|
||||||
messageHandler = messageHandlers[i];
|
messageHandler = messageHandlers[i];
|
||||||
|
|
||||||
// register for publication
|
// register for publication
|
||||||
messageHandlerTypes = messageHandler.getHandledMessages();
|
messageHandlerTypes = messageHandler.getHandledMessages();
|
||||||
handlerType = messageHandlerTypes[0];
|
handlerType = messageHandlerTypes[0];
|
||||||
|
|
||||||
// makes this subscription visible for publication
|
|
||||||
final Subscription[] currentSubs = subsPerMessageSingle.get(handlerType);
|
|
||||||
final int currentLength = currentSubs.length;
|
|
||||||
|
|
||||||
// add the new subscription to the beginning of the array
|
// makes this subscription visible for publication
|
||||||
final Subscription[] newSubs = new Subscription[currentLength + 1];
|
final Subscription[] currentSubs = subsPerMessageSingle.get(handlerType);
|
||||||
newSubs[0] = subscription;
|
final int currentLength = currentSubs.length;
|
||||||
System.arraycopy(currentSubs, 0, newSubs, 1, currentLength);
|
|
||||||
subsPerMessageSingle.put(handlerType, newSubs);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
// add the new subscription to the beginning of the array
|
||||||
|
final Subscription[] newSubs = new Subscription[currentLength + 1];
|
||||||
|
newSubs[0] = subscription;
|
||||||
|
System.arraycopy(currentSubs, 0, newSubs, 1, currentLength);
|
||||||
|
subsPerMessageSingle.put(handlerType, newSubs);
|
||||||
|
|
||||||
|
// update the super types/varity types
|
||||||
|
registerSuperSubs(handlerType);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// subscriptions already exist and must only be updated
|
subsPerMessageSingleREF.lazySet(this, subsPerMessageSingle);
|
||||||
Subscription subscription;
|
// SUBS_SINGLE.set(this, subsPerMessageSingle);
|
||||||
for (int i = 0; i < subscriptions.length; i++) {
|
// this.subsPerMessageSingle = subsPerMessageSingle;
|
||||||
subscription = subscriptions[i];
|
}
|
||||||
subscription.subscribe(listener);
|
else {
|
||||||
|
// subscriptions already exist and must only be updated
|
||||||
|
Subscription subscription;
|
||||||
|
for (int i = 0; i < subscriptions.length; i++) {
|
||||||
|
subscription = subscriptions[i];
|
||||||
|
subscription.subscribe(listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,7 +286,7 @@ class SubscriptionManager {
|
|||||||
// these are concurrent collections
|
// these are concurrent collections
|
||||||
clear();
|
clear();
|
||||||
|
|
||||||
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
final Subscription[] subscriptions = this.subsPerListener.get(listenerClass);
|
||||||
if (subscriptions != null) {
|
if (subscriptions != null) {
|
||||||
Subscription subscription;
|
Subscription subscription;
|
||||||
|
|
||||||
@ -278,6 +297,67 @@ class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private
|
||||||
|
void registerSuperSubs(final Class<?> clazz) {
|
||||||
|
final Class<?>[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response
|
||||||
|
|
||||||
|
final IdentityMap<Class<?>, Subscription[]> local = subsPerMessageSuperSingleRef.get(this);
|
||||||
|
// final IdentityMap<Class<?>, Subscription[]> local = this.subsPerMessageSuperSingle;
|
||||||
|
|
||||||
|
// types was not empty, so collect subscriptions for each type and collate them
|
||||||
|
// save the subscriptions
|
||||||
|
|
||||||
|
Class<?> superClass;
|
||||||
|
Subscription[] superSubs;
|
||||||
|
Subscription sub;
|
||||||
|
|
||||||
|
final int length = superClasses.length;
|
||||||
|
int superSubLength;
|
||||||
|
final ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length);
|
||||||
|
|
||||||
|
// walks through all of the subscriptions that might exist for super types, and if applicable, save them
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
superClass = superClasses[i];
|
||||||
|
superSubs = getExactAsArray(superClass);
|
||||||
|
|
||||||
|
if (superSubs != null) {
|
||||||
|
superSubLength = superSubs.length;
|
||||||
|
for (int j = 0; j < superSubLength; j++) {
|
||||||
|
sub = superSubs[j];
|
||||||
|
|
||||||
|
if (sub.getHandler().acceptsSubtypes()) {
|
||||||
|
subsAsList.add(sub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final int size = subsAsList.size();
|
||||||
|
if (size > 0) {
|
||||||
|
Subscription[] subs = new Subscription[size];
|
||||||
|
subsAsList.toArray(subs);
|
||||||
|
local.put(clazz, subs);
|
||||||
|
|
||||||
|
|
||||||
|
subsPerMessageSuperSingleRef.lazySet(this, local);
|
||||||
|
// subsPerMessageSuperSingle = local;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public
|
public
|
||||||
AtomicBoolean getVarArgPossibility() {
|
AtomicBoolean getVarArgPossibility() {
|
||||||
@ -291,7 +371,7 @@ class SubscriptionManager {
|
|||||||
|
|
||||||
public
|
public
|
||||||
void clear() {
|
void clear() {
|
||||||
this.subUtils.clear();
|
// this.subUtils.clear();
|
||||||
// this.varArgUtils.clear();
|
// this.varArgUtils.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,12 +449,20 @@ class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public
|
public
|
||||||
Subscription[] getExactAsArray(final Class<?> messageClass) {
|
Subscription[] getExactAsArray(final Class<?> messageClass) {
|
||||||
return subscriptionsPerMessageSingle.get(messageClass);
|
return (Subscription[]) subsPerMessageSingleREF.get(this).get(messageClass);
|
||||||
|
// return subsPerMessageSingle.get(messageClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public
|
||||||
|
Subscription[] getSuperExactAsArray(final Class<?> messageClass) {
|
||||||
|
// whenever our subscriptions change, this map is cleared.
|
||||||
|
return (Subscription[]) subsPerMessageSuperSingleRef.get(this).get(messageClass);
|
||||||
|
// return this.subsPerMessageSuperSingle.get(messageClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public
|
public
|
||||||
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
|
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
|
||||||
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
|
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
|
||||||
@ -422,27 +510,31 @@ class SubscriptionManager {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// can NOT return null
|
// can return null
|
||||||
public
|
public
|
||||||
Subscription[] getExactAndSuper(final Class<?> messageClass) {
|
Subscription[] getExactAndSuper(final Class<?> messageClass) {
|
||||||
Subscription[] collection = getExactAsArray(messageClass); // can return null
|
Subscription[] collection = getExactAsArray(messageClass); // can return null
|
||||||
|
|
||||||
// now publish superClasses
|
// now publish superClasses
|
||||||
final Subscription[] superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
|
final Subscription[] superSubscriptions = getSuperExactAsArray(messageClass); // can return null
|
||||||
|
|
||||||
if (collection != null) {
|
if (collection != null) {
|
||||||
final int length = collection.length;
|
if (superSubscriptions != null) {
|
||||||
final int lengthSuper = superSubscriptions.length;
|
// but both into a single array
|
||||||
|
final int length = collection.length;
|
||||||
|
final int lengthSuper = superSubscriptions.length;
|
||||||
|
|
||||||
final Subscription[] newSubs = new Subscription[length + lengthSuper];
|
final Subscription[] newSubs = new Subscription[length + lengthSuper];
|
||||||
System.arraycopy(collection, 0, newSubs, 0, length);
|
System.arraycopy(collection, 0, newSubs, 0, length);
|
||||||
System.arraycopy(superSubscriptions, 0, newSubs, length, lengthSuper);
|
System.arraycopy(superSubscriptions, 0, newSubs, length, lengthSuper);
|
||||||
|
|
||||||
return newSubs;
|
return newSubs;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
return superSubscriptions;
|
return collection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return superSubscriptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
// can return null
|
// can return null
|
||||||
|
@ -15,27 +15,25 @@
|
|||||||
*/
|
*/
|
||||||
package dorkbox.util.messagebus.utils;
|
package dorkbox.util.messagebus.utils;
|
||||||
|
|
||||||
|
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||||
|
|
||||||
import java.lang.reflect.Array;
|
import java.lang.reflect.Array;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public final
|
public final
|
||||||
class ClassUtils {
|
class ClassUtils {
|
||||||
|
|
||||||
private final Map<Class<?>, Class<?>> arrayCache;
|
private final IdentityMap<Class<?>, Class<?>> arrayCache;
|
||||||
private final Map<Class<?>, Class<?>[]> superClassesCache;
|
private final IdentityMap<Class<?>, Class<?>[]> superClassesCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* These data structures are never reset because the class hierarchy doesn't change at runtime
|
* These data structures are never reset because the class hierarchy doesn't change at runtime
|
||||||
*/
|
*/
|
||||||
public
|
public
|
||||||
ClassUtils(final float loadFactor) {
|
ClassUtils(final float loadFactor) {
|
||||||
// this.arrayCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1);
|
this.arrayCache = new IdentityMap<Class<?>, Class<?>>(32);
|
||||||
// this.superClassesCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1);
|
this.superClassesCache = new IdentityMap<Class<?>, Class<?>[]>(32);
|
||||||
this.arrayCache = new IdentityHashMap<Class<?>, Class<?>>(32);
|
|
||||||
this.superClassesCache = new IdentityHashMap<Class<?>, Class<?>[]>(32);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,7 +46,7 @@ class ClassUtils {
|
|||||||
public
|
public
|
||||||
Class<?>[] getSuperClasses(final Class<?> clazz) {
|
Class<?>[] getSuperClasses(final Class<?> clazz) {
|
||||||
// this is never reset, since it never needs to be.
|
// this is never reset, since it never needs to be.
|
||||||
final Map<Class<?>, Class<?>[]> cache = this.superClassesCache;
|
final IdentityMap<Class<?>, Class<?>[]> cache = this.superClassesCache;
|
||||||
|
|
||||||
Class<?>[] classes = cache.get(clazz);
|
Class<?>[] classes = cache.get(clazz);
|
||||||
|
|
||||||
@ -101,7 +99,7 @@ class ClassUtils {
|
|||||||
public
|
public
|
||||||
Class<?> getArrayClass(final Class<?> c) {
|
Class<?> getArrayClass(final Class<?> c) {
|
||||||
// this is never reset, since it never needs to be.
|
// this is never reset, since it never needs to be.
|
||||||
final Map<Class<?>, Class<?>> cache = this.arrayCache;
|
final IdentityMap<Class<?>, Class<?>> cache = this.arrayCache;
|
||||||
Class<?> clazz = cache.get(c);
|
Class<?> clazz = cache.get(c);
|
||||||
|
|
||||||
if (clazz == null) {
|
if (clazz == null) {
|
||||||
|
@ -15,13 +15,12 @@
|
|||||||
*/
|
*/
|
||||||
package dorkbox.util.messagebus.utils;
|
package dorkbox.util.messagebus.utils;
|
||||||
|
|
||||||
|
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||||
import dorkbox.util.messagebus.common.HashMapTree;
|
import dorkbox.util.messagebus.common.HashMapTree;
|
||||||
import dorkbox.util.messagebus.subscription.Subscription;
|
import dorkbox.util.messagebus.subscription.Subscription;
|
||||||
import dorkbox.util.messagebus.subscription.SubscriptionManager;
|
import dorkbox.util.messagebus.subscription.SubscriptionManager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
public final
|
public final
|
||||||
class SubscriptionUtils {
|
class SubscriptionUtils {
|
||||||
@ -30,7 +29,9 @@ class SubscriptionUtils {
|
|||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
||||||
// it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one
|
// it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one
|
||||||
private final Map<Class<?>, Subscription[]> superClassSubscriptions;
|
|
||||||
|
// keeps track of all subscriptions of the super classes of a message type.
|
||||||
|
private volatile IdentityMap<Class<?>, Subscription[]> superClassSubscriptions;
|
||||||
private final HashMapTree<Class<?>, ArrayList<Subscription>> superClassSubscriptionsMulti;
|
private final HashMapTree<Class<?>, ArrayList<Subscription>> superClassSubscriptionsMulti;
|
||||||
|
|
||||||
|
|
||||||
@ -41,7 +42,7 @@ class SubscriptionUtils {
|
|||||||
|
|
||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but improves performance of handlers
|
// it's a hit on SUB/UNSUB, but improves performance of handlers
|
||||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Subscription[]>(8, loadFactor, numberOfThreads);
|
this.superClassSubscriptions = new IdentityMap<Class<?>, Subscription[]>(8, loadFactor);
|
||||||
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
|
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,65 +52,66 @@ class SubscriptionUtils {
|
|||||||
this.superClassSubscriptionsMulti.clear();
|
this.superClassSubscriptionsMulti.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ALWAYS register and create a cached version of the requested class + superClasses
|
// // ALWAYS register and create a cached version of the requested class + superClasses
|
||||||
public
|
// // ONLY called during subscribe
|
||||||
Subscription[] register(final Class<?> clazz, final SubscriptionManager subManager) {
|
// public
|
||||||
final Map<Class<?>, Subscription[]> local = this.superClassSubscriptions;
|
// Subscription[] register(final Class<?> clazz, final SubscriptionManager subManager) {
|
||||||
|
// final IdentityMap<Class<?>, Subscription[]> local = this.superClassSubscriptions;
|
||||||
// types was not empty, so collect subscriptions for each type and collate them
|
//
|
||||||
|
// // types was not empty, so collect subscriptions for each type and collate them
|
||||||
// save the subscriptions
|
//
|
||||||
final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
|
// // save the subscriptions
|
||||||
|
// final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
|
||||||
Class<?> superClass;
|
//
|
||||||
Subscription[] superSubs;
|
// Class<?> superClass;
|
||||||
Subscription sub;
|
// Subscription[] superSubs;
|
||||||
|
// Subscription sub;
|
||||||
final int length = superClasses.length;
|
//
|
||||||
int superSubLength;
|
// final int length = superClasses.length;
|
||||||
final ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length);
|
// int superSubLength;
|
||||||
|
// final ArrayList<Subscription> subsAsList = new ArrayList<Subscription>(length);
|
||||||
for (int i = 0; i < length; i++) {
|
//
|
||||||
superClass = superClasses[i];
|
// for (int i = 0; i < length; i++) {
|
||||||
superSubs = subManager.getExactAsArray(superClass);
|
// superClass = superClasses[i];
|
||||||
|
// superSubs = subManager.getExactAsArray(superClass);
|
||||||
if (superSubs != null) {
|
//
|
||||||
superSubLength = superSubs.length;
|
// if (superSubs != null) {
|
||||||
for (int j = 0; j < superSubLength; j++) {
|
// superSubLength = superSubs.length;
|
||||||
sub = superSubs[j];
|
// for (int j = 0; j < superSubLength; j++) {
|
||||||
|
// sub = superSubs[j];
|
||||||
if (sub.getHandler().acceptsSubtypes()) {
|
//
|
||||||
subsAsList.add(sub);
|
// if (sub.getHandler().acceptsSubtypes()) {
|
||||||
}
|
// subsAsList.add(sub);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
// }
|
||||||
Subscription[] subs = new Subscription[subsAsList.size()];
|
//
|
||||||
subsAsList.toArray(subs);
|
// final int size = subsAsList.size();
|
||||||
local.put(clazz, subs);
|
// if (size > 0) {
|
||||||
|
// Subscription[] subs = new Subscription[size];
|
||||||
return subs;
|
// subsAsList.toArray(subs);
|
||||||
}
|
// local.put(clazz, subs);
|
||||||
|
//
|
||||||
|
// superClassSubscriptions = local;
|
||||||
|
//
|
||||||
|
// return subs;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return null;
|
||||||
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an array COPY of the super subscriptions for the specified type.
|
* Returns an array COPY of the super subscriptions for the specified type.
|
||||||
* <p/>
|
* <p/>
|
||||||
* This ALSO checks to see if the superClass accepts subtypes.
|
* This ALSO checks to see if the superClass accepts subtypes.
|
||||||
*
|
*
|
||||||
* @return CAN NOT RETURN NULL
|
* @return CAN RETURN NULL
|
||||||
*/
|
*/
|
||||||
public
|
public
|
||||||
Subscription[] getSuperSubscriptions(final Class<?> clazz, final SubscriptionManager subManager) {
|
Subscription[] getSuperSubscriptions(final Class<?> clazz) {
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
final Map<Class<?>, Subscription[]> local = this.superClassSubscriptions;
|
return this.superClassSubscriptions.get(clazz);
|
||||||
Subscription[] subs = local.get(clazz);
|
|
||||||
|
|
||||||
if (subs != null) {
|
|
||||||
return subs;
|
|
||||||
}
|
|
||||||
|
|
||||||
return register(clazz, subManager);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user