Added collection perftest. code polish

This commit is contained in:
nathan 2015-05-19 04:00:41 +02:00
parent 6668c2ff81
commit 964f7ee565
10 changed files with 336 additions and 131 deletions

View File

@ -9,7 +9,7 @@ import org.jctools.util.Pow2;
import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode; import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.common.thread.BooleanHolder; import dorkbox.util.messagebus.common.thread.BooleanHolder;
@ -201,7 +201,7 @@ public class MultiMBassador implements IMessageBus {
SubscriptionManager manager = this.subscriptionManager; SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass(); Class<?> messageClass = message.getClass();
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
BooleanHolder subsPublished = this.booleanThreadLocal.get(); BooleanHolder subsPublished = this.booleanThreadLocal.get();
subsPublished.bool = false; subsPublished.bool = false;
@ -222,7 +222,7 @@ public class MultiMBassador implements IMessageBus {
} }
if (!this.forceExactMatches) { if (!this.forceExactMatches) {
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass); StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses // now get superClasses
if (superSubscriptions != null) { if (superSubscriptions != null) {
current = superSubscriptions.head; current = superSubscriptions.head;
@ -240,7 +240,7 @@ public class MultiMBassador implements IMessageBus {
if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) { if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass); StrongConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass, 1); asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message; asArray[0] = message;
@ -255,7 +255,7 @@ public class MultiMBassador implements IMessageBus {
} }
} }
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) { if (asArray == null) {
@ -277,7 +277,7 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished.bool) { if (!subsPublished.bool) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); StrongConcurrentSet<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message); DeadMessage deadMessage = new DeadMessage(message);
@ -300,7 +300,7 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass1 = message1.getClass(); Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass(); Class<?> messageClass2 = message2.getClass();
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
BooleanHolder subsPublished = this.booleanThreadLocal.get(); BooleanHolder subsPublished = this.booleanThreadLocal.get();
subsPublished.bool = false; subsPublished.bool = false;
@ -320,7 +320,7 @@ public class MultiMBassador implements IMessageBus {
} }
if (!this.forceExactMatches) { if (!this.forceExactMatches) {
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// now get superClasses // now get superClasses
if (superSubscriptions != null) { if (superSubscriptions != null) {
current = superSubscriptions.head; current = superSubscriptions.head;
@ -338,7 +338,7 @@ public class MultiMBassador implements IMessageBus {
if (messageClass1 == messageClass2) { if (messageClass1 == messageClass2) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1; asArray[0] = message1;
@ -354,7 +354,7 @@ public class MultiMBassador implements IMessageBus {
} }
} }
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) { if (asArray == null) {
@ -373,7 +373,7 @@ public class MultiMBassador implements IMessageBus {
} }
} }
} else { } else {
StrongConcurrentSetV8<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
@ -400,7 +400,7 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished.bool) { if (!subsPublished.bool) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); StrongConcurrentSet<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2); DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -424,7 +424,7 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass2 = message2.getClass(); Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass(); Class<?> messageClass3 = message3.getClass();
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
BooleanHolder subsPublished = this.booleanThreadLocal.get(); BooleanHolder subsPublished = this.booleanThreadLocal.get();
subsPublished.bool = false; subsPublished.bool = false;
@ -445,7 +445,7 @@ public class MultiMBassador implements IMessageBus {
if (!this.forceExactMatches) { if (!this.forceExactMatches) {
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get superClasses // now get superClasses
if (superSubscriptions != null) { if (superSubscriptions != null) {
current = superSubscriptions.head; current = superSubscriptions.head;
@ -462,7 +462,7 @@ public class MultiMBassador implements IMessageBus {
if (manager.hasVarArgPossibility()) { if (manager.hasVarArgPossibility()) {
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
Object[] asArray = null; Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1; asArray[0] = message1;
@ -479,7 +479,7 @@ public class MultiMBassador implements IMessageBus {
} }
} }
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) { if (asArray == null) {
@ -499,7 +499,7 @@ public class MultiMBassador implements IMessageBus {
} }
} }
} else { } else {
StrongConcurrentSetV8<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get array based superClasses (but only if those ALSO accept vararg) // now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
@ -527,7 +527,7 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished.bool) { if (!subsPublished.bool) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); StrongConcurrentSet<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2, message3); DeadMessage deadMessage = new DeadMessage(message1, message2, message3);

View File

@ -7,6 +7,7 @@ import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SubscriptionUtils; import dorkbox.util.messagebus.common.SubscriptionUtils;
import dorkbox.util.messagebus.common.VarArgPossibility; import dorkbox.util.messagebus.common.VarArgPossibility;
@ -49,25 +50,25 @@ public class SubscriptionManager {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time // write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerMessageSingle; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerMessageMulti; private final HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing // this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently // write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change // once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerListener; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> subscriptionsPerListener;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> superClassSubscriptions; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> superClassSubscriptionsMulti; private final HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> superClassSubscriptionsMulti;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSubscriptions; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> varArgSubscriptions;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptions; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptionsMulti; private final HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> varArgSuperClassSubscriptionsMulti;
// stripe size of maps for concurrency // stripe size of maps for concurrency
private final int STRIPE_SIZE; private final int STRIPE_SIZE;
@ -84,29 +85,29 @@ public class SubscriptionManager {
{ {
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR); this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1); this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR); this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, StrongConcurrentSet<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// only used during SUB/UNSUB // only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1); this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
} }
// modified by N threads // modified by N threads
{ {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR); this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSet<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR); this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSet<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
} }
this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR);
this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR);
} }
public void shutdown() { public void shutdown() {
@ -144,11 +145,11 @@ public class SubscriptionManager {
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility,
// but for concurrency because there are race conditions here if we don't. // but for concurrency because there are race conditions here if we don't.
synchronized(listenerClass) { synchronized(listenerClass) {
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerListener2 = this.subscriptionsPerListener; ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> subsPerListener2 = this.subscriptionsPerListener;
StrongConcurrentSetV8<Subscription> subsPerListener = subsPerListener2.get(listenerClass); StrongConcurrentSet<Subscription> subsPerListener = subsPerListener2.get(listenerClass);
if (subsPerListener == null) { if (subsPerListener == null) {
// a listener is subscribed for the first time // a listener is subscribed for the first time
StrongConcurrentSetV8<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); StrongConcurrentSet<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size(); int handlersSize = messageHandlers.size();
if (handlersSize == 0) { if (handlersSize == 0) {
@ -158,8 +159,8 @@ public class SubscriptionManager {
} else { } else {
VarArgPossibility varArgPossibility = this.varArgPossibility; VarArgPossibility varArgPossibility = this.varArgPossibility;
subsPerListener = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1); subsPerListener = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR);
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle; ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
ISetEntry<MessageHandler> current = messageHandlers.head; ISetEntry<MessageHandler> current = messageHandlers.head;
MessageHandler messageHandler; MessageHandler messageHandler;
@ -167,7 +168,7 @@ public class SubscriptionManager {
messageHandler = current.getValue(); messageHandler = current.getValue();
current = current.next(); current = current.next();
StrongConcurrentSetV8<Subscription> subsPerType = null; StrongConcurrentSet<Subscription> subsPerType = null;
// now add this subscription to each of the handled types // now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages(); Class<?>[] types = messageHandler.getHandledMessages();
@ -178,7 +179,7 @@ public class SubscriptionManager {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get(); subsPerType = subHolderConcurrent.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0],subsPerType); StrongConcurrentSet<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
@ -196,7 +197,7 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle; SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get(); subsPerType = subHolderSingle.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); StrongConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
@ -211,7 +212,7 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle; SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get(); subsPerType = subHolderSingle.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); StrongConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
@ -227,7 +228,7 @@ public class SubscriptionManager {
SubscriptionHolder subHolderSingle = this.subHolderSingle; SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get(); subsPerType = subHolderSingle.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); StrongConcurrentSet<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
@ -283,7 +284,7 @@ public class SubscriptionManager {
clearConcurrentCollections(); clearConcurrentCollections();
synchronized(listenerClass) { synchronized(listenerClass) {
StrongConcurrentSetV8<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass); StrongConcurrentSet<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) { if (subscriptions != null) {
ISetEntry<Subscription> current = subscriptions.head; ISetEntry<Subscription> current = subscriptions.head;
Subscription subscription; Subscription subscription;
@ -305,18 +306,18 @@ public class SubscriptionManager {
} }
// CAN RETURN NULL // CAN RETURN NULL
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public final StrongConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType); return this.subscriptionsPerMessageSingle.get(messageType);
} }
// CAN RETURN NULL // CAN RETURN NULL
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) { public final StrongConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.get(messageType1, messageType2); return this.subscriptionsPerMessageMulti.get(messageType1, messageType2);
} }
// CAN RETURN NULL // CAN RETURN NULL
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) { public final StrongConcurrentSet<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
} }
@ -328,15 +329,15 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSubscriptions(Class<?> messageClass) { public StrongConcurrentSet<Subscription> getVarArgSubscriptions(Class<?> messageClass) {
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSubscriptions; ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
StrongConcurrentSetV8<Subscription> subsPerType = subHolderConcurrent.get(); StrongConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast! // cache our subscriptions for super classes, so that their access can be fast!
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue()); subHolderConcurrent.set(subHolderConcurrent.initialValue());
@ -344,12 +345,12 @@ public class SubscriptionManager {
// this caches our array type. This is never cleared. // this caches our array type. This is never cleared.
Class<?> arrayVersion = this.utils.getArrayClass(messageClass); Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle; Map<Class<?>, StrongConcurrentSet<Subscription>> local2 = this.subscriptionsPerMessageSingle;
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
StrongConcurrentSetV8<Subscription> subs = local2.get(arrayVersion); StrongConcurrentSet<Subscription> subs = local2.get(arrayVersion);
if (subs != null) { if (subs != null) {
current = subs.head; current = subs.head;
while (current != null) { while (current != null) {
@ -371,27 +372,27 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) { public StrongConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptions; ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
StrongConcurrentSetV8<Subscription> subsPerType = subHolderConcurrent.get(); StrongConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast! // cache our subscriptions for super classes, so that their access can be fast!
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue()); subHolderConcurrent.set(subHolderConcurrent.initialValue());
Class<?> arrayVersion = this.utils.getArrayClass(messageClass); Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
StrongConcurrentSetV8<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true); StrongConcurrentSet<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
if (types.isEmpty()) { if (types.isEmpty()) {
return null; return null;
} }
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle; Map<Class<?>, StrongConcurrentSet<Subscription>> local2 = this.subscriptionsPerMessageSingle;
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
@ -404,7 +405,7 @@ public class SubscriptionManager {
superClass = current1.getValue(); superClass = current1.getValue();
current1 = current1.next(); current1 = current1.next();
StrongConcurrentSetV8<Subscription> subs = local2.get(superClass); StrongConcurrentSet<Subscription> subs = local2.get(superClass);
if (subs != null) { if (subs != null) {
current = subs.head; current = subs.head;
while (current != null) { while (current != null) {
@ -428,12 +429,12 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) { public StrongConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
StrongConcurrentSetV8<Subscription> subsPerType = null; StrongConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) { if (subsPerTypeLeaf != null) {
@ -443,15 +444,15 @@ public class SubscriptionManager {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get(); subsPerType = subHolderConcurrent.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
if (putIfAbsent != null) { if (putIfAbsent != null) {
// someone beat us // someone beat us
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
// the message class types are not the same, so look for a common superClass varArg subscription. // the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); StrongConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
@ -476,12 +477,12 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) { public StrongConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
StrongConcurrentSetV8<Subscription> subsPerType = null; StrongConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) { if (subsPerTypeLeaf != null) {
@ -491,16 +492,16 @@ public class SubscriptionManager {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get(); subsPerType = subHolderConcurrent.get();
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
if (putIfAbsent != null) { if (putIfAbsent != null) {
// someone beat us // someone beat us
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
// the message class types are not the same, so look for a common superClass varArg subscription. // the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); StrongConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); StrongConcurrentSet<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
@ -525,18 +526,18 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public final StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType) { public final StrongConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType) {
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptions; ConcurrentMap<Class<?>, StrongConcurrentSet<Subscription>> local = this.superClassSubscriptions;
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
StrongConcurrentSetV8<Subscription> subsPerType = subHolderConcurrent.get(); StrongConcurrentSet<Subscription> subsPerType = subHolderConcurrent.get();
// cache our subscriptions for super classes, so that their access can be fast! // cache our subscriptions for super classes, so that their access can be fast!
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
StrongConcurrentSetV8<Class<?>> types = this.utils.getSuperClasses(superType); StrongConcurrentSet<Class<?>> types = this.utils.getSuperClasses(superType);
if (types.isEmpty()) { if (types.isEmpty()) {
return null; return null;
} }
@ -544,7 +545,7 @@ public class SubscriptionManager {
// we are the first one in the map // we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue()); subHolderConcurrent.set(subHolderConcurrent.initialValue());
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle; Map<Class<?>, StrongConcurrentSet<Subscription>> local2 = this.subscriptionsPerMessageSingle;
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
@ -557,7 +558,7 @@ public class SubscriptionManager {
superClass = current1.getValue(); superClass = current1.getValue();
current1 = current1.next(); current1 = current1.next();
StrongConcurrentSetV8<Subscription> subs = local2.get(superClass); StrongConcurrentSet<Subscription> subs = local2.get(superClass);
if (subs != null) { if (subs != null) {
current = subs.head; current = subs.head;
while (current != null) { while (current != null) {
@ -580,12 +581,12 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) { public StrongConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2); HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
StrongConcurrentSetV8<Subscription> subsPerType = null; StrongConcurrentSet<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) { if (subsPerTypeLeaf != null) {
@ -596,18 +597,18 @@ public class SubscriptionManager {
subsPerType = subHolderSingle.get(); subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast! // cache our subscriptions for super classes, so that their access can be fast!
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue()); subHolderSingle.set(subHolderSingle.initialValue());
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
StrongConcurrentSetV8<Class<?>> types1 = this.utils.getSuperClasses(superType1); StrongConcurrentSet<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.utils.getSuperClasses(superType2); StrongConcurrentSet<Class<?>> types2 = this.utils.getSuperClasses(superType2);
StrongConcurrentSetV8<Subscription> subs; StrongConcurrentSet<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> leaf2;
ISetEntry<Subscription> current = null; ISetEntry<Subscription> current = null;
Subscription sub; Subscription sub;
@ -673,12 +674,12 @@ public class SubscriptionManager {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) { public StrongConcurrentSet<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
StrongConcurrentSetV8<Subscription> subsPerType; StrongConcurrentSet<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same // we DO NOT care about duplicate, because the answers will be the same
@ -690,19 +691,19 @@ public class SubscriptionManager {
subsPerType = subHolderSingle.get(); subsPerType = subHolderSingle.get();
// cache our subscriptions for super classes, so that their access can be fast! // cache our subscriptions for super classes, so that their access can be fast!
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); StrongConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue()); subHolderSingle.set(subHolderSingle.initialValue());
StrongConcurrentSetV8<Class<?>> types1 = this.utils.getSuperClasses(superType1); StrongConcurrentSet<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.utils.getSuperClasses(superType2); StrongConcurrentSet<Class<?>> types2 = this.utils.getSuperClasses(superType2);
StrongConcurrentSetV8<Class<?>> types3 = this.utils.getSuperClasses(superType3); StrongConcurrentSet<Class<?>> types3 = this.utils.getSuperClasses(superType3);
StrongConcurrentSetV8<Subscription> subs; StrongConcurrentSet<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> leaf2;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf3; HashMapTree<Class<?>, StrongConcurrentSet<Subscription>> leaf3;
ISetEntry<Subscription> current = null; ISetEntry<Subscription> current = null;
Subscription sub; Subscription sub;

View File

@ -17,7 +17,7 @@ import dorkbox.util.messagebus.annotations.Handler;
public class ReflectionUtils { public class ReflectionUtils {
public static StrongConcurrentSetV8<Method> getMethods(Class<?> target) { public static StrongConcurrentSetV8<Method> getMethods(Class<?> target) {
StrongConcurrentSetV8<Method> hashSet = new StrongConcurrentSetV8<Method>(16, .8F, 1); StrongConcurrentSetV8<Method> hashSet = new StrongConcurrentSetV8<Method>(16, .8F);
getMethods(target, hashSet); getMethods(target, hashSet);
return hashSet; return hashSet;
} }
@ -67,7 +67,7 @@ public class ReflectionUtils {
* @return A set of classes, each representing a super type of the root class * @return A set of classes, each representing a super type of the root class
*/ */
public static StrongConcurrentSetV8<Class<?>> getSuperTypes(Class<?> from) { public static StrongConcurrentSetV8<Class<?>> getSuperTypes(Class<?> from) {
StrongConcurrentSetV8<Class<?>> superclasses = new StrongConcurrentSetV8<Class<?>>(8, 0.8F, 1); StrongConcurrentSetV8<Class<?>> superclasses = new StrongConcurrentSetV8<Class<?>>(8, 0.8F);
collectInterfaces( from, superclasses ); collectInterfaces( from, superclasses );
@ -131,7 +131,7 @@ public class ReflectionUtils {
} }
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType) { public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType) {
A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8<AnnotatedElement>(16, .8F, 1)); A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8<AnnotatedElement>(16, .8F));
return annotation; return annotation;
} }

View File

@ -9,7 +9,8 @@ package dorkbox.util.messagebus.common;
*/ */
public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> { public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> {
public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) { public StrongConcurrentSetV8(int size, float loadFactor) {
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, stripeSize)); // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, 1));
} }
} }

View File

@ -11,7 +11,7 @@ public class SubscriptionUtils {
private final Map<Class<?>, Class<?>> arrayVersionCache; private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache; private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache; private final ConcurrentMap<Class<?>, StrongConcurrentSet<Class<?>>> superClassesCache;
private final ClassHolder classHolderSingle; private final ClassHolder classHolderSingle;
@ -19,8 +19,8 @@ public class SubscriptionUtils {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, loadFactor, stripeSize); this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, loadFactor, stripeSize); this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, loadFactor, stripeSize); this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSet<Class<?>>>(64, loadFactor, stripeSize);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize); this.classHolderSingle = new ClassHolder(loadFactor);
} }
/** /**
@ -28,24 +28,24 @@ public class SubscriptionUtils {
* never returns null * never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/ */
public StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz) { public StrongConcurrentSet<Class<?>> getSuperClasses(Class<?> clazz) {
return getSuperClasses(clazz, isArray(clazz)); return getSuperClasses(clazz, isArray(clazz));
} }
public final StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) { public final StrongConcurrentSet<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be. // this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache; ConcurrentMap<Class<?>, StrongConcurrentSet<Class<?>>> local = this.superClassesCache;
ClassHolder classHolderSingle = this.classHolderSingle; ClassHolder classHolderSingle = this.classHolderSingle;
StrongConcurrentSetV8<Class<?>> classes = classHolderSingle.get(); StrongConcurrentSet<Class<?>> classes = classHolderSingle.get();
StrongConcurrentSetV8<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes); StrongConcurrentSet<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
if (putIfAbsent == null) { if (putIfAbsent == null) {
// we are the first one in the map // we are the first one in the map
classHolderSingle.set(classHolderSingle.initialValue()); classHolderSingle.set(classHolderSingle.initialValue());
// it doesn't matter if concurrent access stomps on values, since they are always the same. // it doesn't matter if concurrent access stomps on values, since they are always the same.
StrongConcurrentSetV8<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); StrongConcurrentSet<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ISetEntry<Class<?>> current = superTypes.head; ISetEntry<Class<?>> current = superTypes.head;
Class<?> c; Class<?> c;

View File

@ -5,19 +5,17 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
public class ClassHolder extends ThreadLocal<StrongConcurrentSetV8<Class<?>>> { public class ClassHolder extends ThreadLocal<StrongConcurrentSetV8<Class<?>>> {
private final int stripeSize;
private final float loadFactor; private final float loadFactor;
public ClassHolder(float loadFactor, int stripeSize) { public ClassHolder(float loadFactor) {
super(); super();
this.stripeSize = stripeSize;
this.loadFactor = loadFactor; this.loadFactor = loadFactor;
} }
@Override @Override
public StrongConcurrentSetV8<Class<?>> initialValue() { public StrongConcurrentSetV8<Class<?>> initialValue() {
return new StrongConcurrentSetV8<Class<?>>(16, this.loadFactor, this.stripeSize); return new StrongConcurrentSetV8<Class<?>>(16, this.loadFactor);
} }
} }

View File

@ -5,19 +5,17 @@ import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionHolder extends ThreadLocal<StrongConcurrentSetV8<Subscription>> { public class SubscriptionHolder extends ThreadLocal<StrongConcurrentSetV8<Subscription>> {
private final int stripeSize;
private final float loadFactor; private final float loadFactor;
public SubscriptionHolder(float loadFactor, int stripeSize) { public SubscriptionHolder(float loadFactor) {
super(); super();
this.stripeSize = stripeSize;
this.loadFactor = loadFactor; this.loadFactor = loadFactor;
} }
@Override @Override
public StrongConcurrentSetV8<Subscription> initialValue() { public StrongConcurrentSetV8<Subscription> initialValue() {
return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor, this.stripeSize); return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor);
} }
} }

View File

@ -22,7 +22,7 @@ public class MessageListener {
private Class<?> listenerDefinition; private Class<?> listenerDefinition;
public MessageListener(Class<?> listenerDefinition, int size) { public MessageListener(Class<?> listenerDefinition, int size) {
this.handlers = new StrongConcurrentSetV8<MessageHandler>(size, 0.8F, 1); this.handlers = new StrongConcurrentSetV8<MessageHandler>(size, 0.8F);
this.listenerDefinition = listenerDefinition; this.listenerDefinition = listenerDefinition;
} }

View File

@ -25,7 +25,7 @@ public class MetadataReader {
StrongConcurrentSetV8<Method> allHandlers = ReflectionUtils.getMethods(target); StrongConcurrentSetV8<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
StrongConcurrentSetV8<Method> bottomMostHandlers = new StrongConcurrentSetV8<Method>(allHandlers.size(), 0.8F, 1); StrongConcurrentSetV8<Method> bottomMostHandlers = new StrongConcurrentSetV8<Method>(allHandlers.size(), 0.8F);
ISetEntry<Method> current = allHandlers.head; ISetEntry<Method> current = allHandlers.head;

View File

@ -0,0 +1,207 @@
package dorkbox.util.messagebus;
import java.lang.ref.WeakReference;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MessageListener;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription;
public class PerfTest_Collections {
public static final int REPETITIONS = 10 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final float LOAD_FACTOR = 0.8F;
private static MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class);
public static void main(final String[] args) throws Exception {
final int size = 16;
System.out.println("reps:" + REPETITIONS + " size: " + size);
// have to warm-up the JVM.
System.err.print("\nWarming up JVM.");
// for (int i=0;i<2;i++) {
bench(size, new ConcurrentLinkedQueue<Subscription>(), false);
System.err.print(".");
bench(size, new ArrayList<Subscription>(size*2), false);
System.err.print(".");
bench(size, new ArrayDeque<Subscription>(size*2), false);
System.err.print(".");
bench(size, new ConcurrentLinkedQueue<Subscription>(), false);
System.err.print(".");
bench(size, new LinkedList<Subscription>(), false);
System.err.print(".");
// }
System.err.println("Done");
bench(size, new ArrayList<Subscription>(size*2));
bench(size, new ConcurrentLinkedQueue<Subscription>());
bench(size, new LinkedTransferQueue<Subscription>());
bench(size, new ArrayDeque<Subscription>(size*2));
bench(size, new ConcurrentLinkedQueue<Subscription>());
bench(size, new LinkedList<Subscription>());
bench(size, new StrongConcurrentSetV8<Subscription>(size*2, LOAD_FACTOR));
bench(size, Collections.<Subscription>newSetFromMap(new ConcurrentHashMapV8<Subscription, Boolean>(size*2, LOAD_FACTOR, 1)));
bench(size, new StrongConcurrentSet<Subscription>(size*2, LOAD_FACTOR));
bench(size, new HashSet<Subscription>());
// bench(size, new ConcurrentSkipListSet<Subscription>()); // needs comparable
}
public static void bench(final int size, Collection<Subscription> set) throws Exception {
bench(size, set, true);
}
public static void bench(final int size, Collection<Subscription> set, boolean showOutput) throws Exception {
final int warmupRuns = 2;
final int runs = 3;
for (int i=0;i<size;i++) {
for (MessageHandler x : messageListener.getHandlers()) {
set.add(new Subscription(x));
}
}
for (int i=2;i<6;i++) {
long average = averageRun(warmupRuns, runs, set, false, i, REPETITIONS);
if (showOutput) {
System.out.format("summary,IteratorPerfTest,%s - %,d (%d)\n", set.getClass().getSimpleName(), average, i);
}
}
set.clear();
}
public static long averageRun(int warmUpRuns, int sumCount, Collection<Subscription> set, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
WeakReference<Object> weakReference = new WeakReference<>(new Object());
while (weakReference.get() != null) {
System.gc();
Thread.sleep(100L);
}
results[i] = performanceRun(i, set, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, Collection<Subscription> set, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(set, repetitions);
threads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i=0;i<concurrency;i++) {
threads[i].start();
}
for (int i=0;i<concurrency;i++) {
threads[i].join();
}
long start = Long.MAX_VALUE;
long end = -1;
long count = 0;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (producers[i].end > end) {
end = producers[i].end;
}
count += producers[i].count;
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
if (showStats) {
System.out.format("%d (%d) - ops/sec=%,d\n", runNumber, count, ops);
}
return ops;
}
public static class Producer implements Runnable {
private final Collection<Subscription> set;
volatile long start;
volatile long end;
private int repetitions;
volatile int count;
public Producer(Collection<Subscription> set, int repetitions) {
this.set = set;
this.repetitions = repetitions;
}
@SuppressWarnings("unused")
@Override
public void run() {
Collection<Subscription> set = this.set;
int i = this.repetitions;
this.start = System.nanoTime();
// Entry<Subscription> current;
// Subscription sub;
int count = 0;
do {
for (Subscription sub : set) {
// if (sub.acceptsSubtypes()) {
// count--;
// } else {
count++;
// }
}
// current = set.head;
// while (current != null) {
// sub = current.getValue();
// current = current.next();
//
//// count++;
// }
} while (0 != --i);
this.end = System.nanoTime();
this.count = count;
}
}
@SuppressWarnings("unused")
public static class Listener {
@Handler
public void handleSync(Integer o1) {
}
@Handler(acceptVarargs=true)
public void handleSync(Object... o) {
}
}
}