diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 8a6f529..1f9739e 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -9,7 +9,7 @@ import org.jctools.util.Pow2; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.NamedThreadFactory; -import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; import dorkbox.util.messagebus.common.thread.BooleanHolder; @@ -201,7 +201,7 @@ public class MultiMBassador implements IMessageBus { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); - StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass); + StrongConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass); BooleanHolder subsPublished = this.booleanThreadLocal.get(); subsPublished.bool = false; @@ -222,7 +222,7 @@ public class MultiMBassador implements IMessageBus { } if (!this.forceExactMatches) { - StrongConcurrentSetV8 superSubscriptions = manager.getSuperSubscriptions(messageClass); + StrongConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses if (superSubscriptions != null) { current = superSubscriptions.head; @@ -240,7 +240,7 @@ public class MultiMBassador implements IMessageBus { if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) { Object[] asArray = null; - StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass); + StrongConcurrentSet varargSubscriptions = manager.getVarArgSubscriptions(messageClass); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { asArray = (Object[]) Array.newInstance(messageClass, 1); asArray[0] = message; @@ -255,7 +255,7 @@ public class MultiMBassador implements IMessageBus { } } - StrongConcurrentSetV8 varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); + StrongConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (asArray == null) { @@ -277,7 +277,7 @@ public class MultiMBassador implements IMessageBus { if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) - StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + StrongConcurrentSet deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message); @@ -300,7 +300,7 @@ public class MultiMBassador implements IMessageBus { Class messageClass1 = message1.getClass(); Class messageClass2 = message2.getClass(); - StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); + StrongConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); BooleanHolder subsPublished = this.booleanThreadLocal.get(); subsPublished.bool = false; @@ -320,7 +320,7 @@ public class MultiMBassador implements IMessageBus { } if (!this.forceExactMatches) { - StrongConcurrentSetV8 superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); + StrongConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // now get superClasses if (superSubscriptions != null) { current = superSubscriptions.head; @@ -338,7 +338,7 @@ public class MultiMBassador implements IMessageBus { if (messageClass1 == messageClass2) { Object[] asArray = null; - StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); + StrongConcurrentSet varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { asArray = (Object[]) Array.newInstance(messageClass1, 2); asArray[0] = message1; @@ -354,7 +354,7 @@ public class MultiMBassador implements IMessageBus { } } - StrongConcurrentSetV8 varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (asArray == null) { @@ -373,7 +373,7 @@ public class MultiMBassador implements IMessageBus { } } } else { - StrongConcurrentSetV8 varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); + StrongConcurrentSet varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { @@ -400,7 +400,7 @@ public class MultiMBassador implements IMessageBus { if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) - StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + StrongConcurrentSet deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -424,7 +424,7 @@ public class MultiMBassador implements IMessageBus { Class messageClass2 = message2.getClass(); Class messageClass3 = message3.getClass(); - StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); + StrongConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); BooleanHolder subsPublished = this.booleanThreadLocal.get(); subsPublished.bool = false; @@ -445,7 +445,7 @@ public class MultiMBassador implements IMessageBus { if (!this.forceExactMatches) { - StrongConcurrentSetV8 superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); + StrongConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // now get superClasses if (superSubscriptions != null) { current = superSubscriptions.head; @@ -462,7 +462,7 @@ public class MultiMBassador implements IMessageBus { if (manager.hasVarArgPossibility()) { if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { Object[] asArray = null; - StrongConcurrentSetV8 varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); + StrongConcurrentSet varargSubscriptions = manager.getVarArgSubscriptions(messageClass1); if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { asArray = (Object[]) Array.newInstance(messageClass1, 3); asArray[0] = message1; @@ -479,7 +479,7 @@ public class MultiMBassador implements IMessageBus { } } - StrongConcurrentSetV8 varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { if (asArray == null) { @@ -499,7 +499,7 @@ public class MultiMBassador implements IMessageBus { } } } else { - StrongConcurrentSetV8 varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); + StrongConcurrentSet varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // now get array based superClasses (but only if those ALSO accept vararg) if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { @@ -527,7 +527,7 @@ public class MultiMBassador implements IMessageBus { if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) - StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + StrongConcurrentSet deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message1, message2, message3); diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index e0fa1be..e820ee9 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -7,6 +7,7 @@ import java.util.concurrent.ConcurrentMap; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.ISetEntry; +import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.SubscriptionUtils; import dorkbox.util.messagebus.common.VarArgPossibility; @@ -49,25 +50,25 @@ public class SubscriptionManager { // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final ConcurrentMap, StrongConcurrentSetV8> subscriptionsPerMessageSingle; - private final HashMapTree, StrongConcurrentSetV8> subscriptionsPerMessageMulti; + private final ConcurrentMap, StrongConcurrentSet> subscriptionsPerMessageSingle; + private final HashMapTree, StrongConcurrentSet> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change - private final ConcurrentMap, StrongConcurrentSetV8> subscriptionsPerListener; + private final ConcurrentMap, StrongConcurrentSet> subscriptionsPerListener; // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private final ConcurrentMap, StrongConcurrentSetV8> superClassSubscriptions; - private final HashMapTree, StrongConcurrentSetV8> superClassSubscriptionsMulti; + private final ConcurrentMap, StrongConcurrentSet> superClassSubscriptions; + private final HashMapTree, StrongConcurrentSet> superClassSubscriptionsMulti; - private final ConcurrentMap, StrongConcurrentSetV8> varArgSubscriptions; - private final ConcurrentMap, StrongConcurrentSetV8> varArgSuperClassSubscriptions; - private final HashMapTree, StrongConcurrentSetV8> varArgSuperClassSubscriptionsMulti; + private final ConcurrentMap, StrongConcurrentSet> varArgSubscriptions; + private final ConcurrentMap, StrongConcurrentSet> varArgSuperClassSubscriptions; + private final HashMapTree, StrongConcurrentSet> varArgSuperClassSubscriptionsMulti; // stripe size of maps for concurrency private final int STRIPE_SIZE; @@ -84,29 +85,29 @@ public class SubscriptionManager { { this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, SubscriptionManager.LOAD_FACTOR); - this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); + this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, StrongConcurrentSet>(64, SubscriptionManager.LOAD_FACTOR, 1); + this.subscriptionsPerMessageMulti = new HashMapTree, StrongConcurrentSet>(4, SubscriptionManager.LOAD_FACTOR); // only used during SUB/UNSUB - this.subscriptionsPerListener = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, 1); + this.subscriptionsPerListener = new ConcurrentHashMapV8, StrongConcurrentSet>(64, SubscriptionManager.LOAD_FACTOR, 1); } // modified by N threads { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.superClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); + this.superClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSet>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.superClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSet>(4, SubscriptionManager.LOAD_FACTOR); // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSetV8>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.varArgSuperClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); + this.varArgSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSet>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8, StrongConcurrentSet>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.varArgSuperClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSet>(4, SubscriptionManager.LOAD_FACTOR); } - this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); - this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR); + this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR); } public void shutdown() { @@ -144,11 +145,11 @@ public class SubscriptionManager { // the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, // but for concurrency because there are race conditions here if we don't. synchronized(listenerClass) { - ConcurrentMap, StrongConcurrentSetV8> subsPerListener2 = this.subscriptionsPerListener; - StrongConcurrentSetV8 subsPerListener = subsPerListener2.get(listenerClass); + ConcurrentMap, StrongConcurrentSet> subsPerListener2 = this.subscriptionsPerListener; + StrongConcurrentSet subsPerListener = subsPerListener2.get(listenerClass); if (subsPerListener == null) { // a listener is subscribed for the first time - StrongConcurrentSetV8 messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); + StrongConcurrentSet messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); int handlersSize = messageHandlers.size(); if (handlersSize == 0) { @@ -158,8 +159,8 @@ public class SubscriptionManager { } else { VarArgPossibility varArgPossibility = this.varArgPossibility; - subsPerListener = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1); - ConcurrentMap, StrongConcurrentSetV8> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + subsPerListener = new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR); + ConcurrentMap, StrongConcurrentSet> subsPerMessageSingle = this.subscriptionsPerMessageSingle; ISetEntry current = messageHandlers.head; MessageHandler messageHandler; @@ -167,7 +168,7 @@ public class SubscriptionManager { messageHandler = current.getValue(); current = current.next(); - StrongConcurrentSetV8 subsPerType = null; + StrongConcurrentSet subsPerType = null; // now add this subscription to each of the handled types Class[] types = messageHandler.getHandledMessages(); @@ -178,7 +179,7 @@ public class SubscriptionManager { SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; subsPerType = subHolderConcurrent.get(); - StrongConcurrentSetV8 putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0],subsPerType); + StrongConcurrentSet putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { @@ -196,7 +197,7 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); + StrongConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { @@ -211,7 +212,7 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); + StrongConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { @@ -227,7 +228,7 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); + StrongConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { @@ -283,7 +284,7 @@ public class SubscriptionManager { clearConcurrentCollections(); synchronized(listenerClass) { - StrongConcurrentSetV8 subscriptions = this.subscriptionsPerListener.get(listenerClass); + StrongConcurrentSet subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { ISetEntry current = subscriptions.head; Subscription subscription; @@ -305,18 +306,18 @@ public class SubscriptionManager { } // CAN RETURN NULL - public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType) { + public final StrongConcurrentSet getSubscriptionsByMessageType(Class messageType) { return this.subscriptionsPerMessageSingle.get(messageType); } // CAN RETURN NULL - public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType1, Class messageType2) { + public final StrongConcurrentSet getSubscriptionsByMessageType(Class messageType1, Class messageType2) { return this.subscriptionsPerMessageMulti.get(messageType1, messageType2); } // CAN RETURN NULL - public final StrongConcurrentSetV8 getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { + public final StrongConcurrentSet getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); } @@ -328,15 +329,15 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions - public StrongConcurrentSetV8 getVarArgSubscriptions(Class messageClass) { - ConcurrentMap, StrongConcurrentSetV8> local = this.varArgSubscriptions; + public StrongConcurrentSet getVarArgSubscriptions(Class messageClass) { + ConcurrentMap, StrongConcurrentSet> local = this.varArgSubscriptions; // whenever our subscriptions change, this map is cleared. SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - StrongConcurrentSetV8 subsPerType = subHolderConcurrent.get(); + StrongConcurrentSet subsPerType = subHolderConcurrent.get(); // cache our subscriptions for super classes, so that their access can be fast! - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(messageClass, subsPerType); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); if (putIfAbsent == null) { // we are the first one in the map subHolderConcurrent.set(subHolderConcurrent.initialValue()); @@ -344,12 +345,12 @@ public class SubscriptionManager { // this caches our array type. This is never cleared. Class arrayVersion = this.utils.getArrayClass(messageClass); - Map, StrongConcurrentSetV8> local2 = this.subscriptionsPerMessageSingle; + Map, StrongConcurrentSet> local2 = this.subscriptionsPerMessageSingle; ISetEntry current; Subscription sub; - StrongConcurrentSetV8 subs = local2.get(arrayVersion); + StrongConcurrentSet subs = local2.get(arrayVersion); if (subs != null) { current = subs.head; while (current != null) { @@ -371,27 +372,27 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public StrongConcurrentSetV8 getVarArgSuperSubscriptions(Class messageClass) { + public StrongConcurrentSet getVarArgSuperSubscriptions(Class messageClass) { // whenever our subscriptions change, this map is cleared. - ConcurrentMap, StrongConcurrentSetV8> local = this.varArgSuperClassSubscriptions; + ConcurrentMap, StrongConcurrentSet> local = this.varArgSuperClassSubscriptions; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - StrongConcurrentSetV8 subsPerType = subHolderConcurrent.get(); + StrongConcurrentSet subsPerType = subHolderConcurrent.get(); // cache our subscriptions for super classes, so that their access can be fast! - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(messageClass, subsPerType); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(messageClass, subsPerType); if (putIfAbsent == null) { // we are the first one in the map subHolderConcurrent.set(subHolderConcurrent.initialValue()); Class arrayVersion = this.utils.getArrayClass(messageClass); - StrongConcurrentSetV8> types = this.utils.getSuperClasses(arrayVersion, true); + StrongConcurrentSet> types = this.utils.getSuperClasses(arrayVersion, true); if (types.isEmpty()) { return null; } - Map, StrongConcurrentSetV8> local2 = this.subscriptionsPerMessageSingle; + Map, StrongConcurrentSet> local2 = this.subscriptionsPerMessageSingle; ISetEntry current; Subscription sub; @@ -404,7 +405,7 @@ public class SubscriptionManager { superClass = current1.getValue(); current1 = current1.next(); - StrongConcurrentSetV8 subs = local2.get(superClass); + StrongConcurrentSet subs = local2.get(superClass); if (subs != null) { current = subs.head; while (current != null) { @@ -428,12 +429,12 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public StrongConcurrentSetV8 getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { - HashMapTree, StrongConcurrentSetV8> local = this.varArgSuperClassSubscriptionsMulti; + public StrongConcurrentSet getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { + HashMapTree, StrongConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); - StrongConcurrentSetV8 subsPerType = null; + HashMapTree, StrongConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2); + StrongConcurrentSet subsPerType = null; // we DO NOT care about duplicate, because the answers will be the same if (subsPerTypeLeaf != null) { @@ -443,15 +444,15 @@ public class SubscriptionManager { SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; subsPerType = subHolderConcurrent.get(); - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); if (putIfAbsent != null) { // someone beat us subsPerType = putIfAbsent; } else { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - StrongConcurrentSetV8 varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); - StrongConcurrentSetV8 varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); + StrongConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); ISetEntry current; Subscription sub; @@ -476,12 +477,12 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public StrongConcurrentSetV8 getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - HashMapTree, StrongConcurrentSetV8> local = this.varArgSuperClassSubscriptionsMulti; + public StrongConcurrentSet getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + HashMapTree, StrongConcurrentSet> local = this.varArgSuperClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); - StrongConcurrentSetV8 subsPerType = null; + HashMapTree, StrongConcurrentSet> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3); + StrongConcurrentSet subsPerType = null; // we DO NOT care about duplicate, because the answers will be the same if (subsPerTypeLeaf != null) { @@ -491,16 +492,16 @@ public class SubscriptionManager { SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; subsPerType = subHolderConcurrent.get(); - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); if (putIfAbsent != null) { // someone beat us subsPerType = putIfAbsent; } else { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - StrongConcurrentSetV8 varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); - StrongConcurrentSetV8 varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); - StrongConcurrentSetV8 varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); + StrongConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); + StrongConcurrentSet varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); ISetEntry current; Subscription sub; @@ -525,18 +526,18 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public final StrongConcurrentSetV8 getSuperSubscriptions(Class superType) { + public final StrongConcurrentSet getSuperSubscriptions(Class superType) { // whenever our subscriptions change, this map is cleared. - ConcurrentMap, StrongConcurrentSetV8> local = this.superClassSubscriptions; + ConcurrentMap, StrongConcurrentSet> local = this.superClassSubscriptions; SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - StrongConcurrentSetV8 subsPerType = subHolderConcurrent.get(); + StrongConcurrentSet subsPerType = subHolderConcurrent.get(); // cache our subscriptions for super classes, so that their access can be fast! - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(superType, subsPerType); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(superType, subsPerType); if (putIfAbsent == null) { // we are the first one in the map - StrongConcurrentSetV8> types = this.utils.getSuperClasses(superType); + StrongConcurrentSet> types = this.utils.getSuperClasses(superType); if (types.isEmpty()) { return null; } @@ -544,7 +545,7 @@ public class SubscriptionManager { // we are the first one in the map subHolderConcurrent.set(subHolderConcurrent.initialValue()); - Map, StrongConcurrentSetV8> local2 = this.subscriptionsPerMessageSingle; + Map, StrongConcurrentSet> local2 = this.subscriptionsPerMessageSingle; ISetEntry current; Subscription sub; @@ -557,7 +558,7 @@ public class SubscriptionManager { superClass = current1.getValue(); current1 = current1.next(); - StrongConcurrentSetV8 subs = local2.get(superClass); + StrongConcurrentSet subs = local2.get(superClass); if (subs != null) { current = subs.head; while (current != null) { @@ -580,12 +581,12 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2) { - HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; + public StrongConcurrentSet getSuperSubscriptions(Class superType1, Class superType2) { + HashMapTree, StrongConcurrentSet> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(superType1, superType2); - StrongConcurrentSetV8 subsPerType = null; + HashMapTree, StrongConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2); + StrongConcurrentSet subsPerType = null; // we DO NOT care about duplicate, because the answers will be the same if (subsPerTypeLeaf != null) { @@ -596,18 +597,18 @@ public class SubscriptionManager { subsPerType = subHolderSingle.get(); // cache our subscriptions for super classes, so that their access can be fast! - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); if (putIfAbsent == null) { // we are the first one in the map subHolderSingle.set(subHolderSingle.initialValue()); // whenever our subscriptions change, this map is cleared. - StrongConcurrentSetV8> types1 = this.utils.getSuperClasses(superType1); - StrongConcurrentSetV8> types2 = this.utils.getSuperClasses(superType2); + StrongConcurrentSet> types1 = this.utils.getSuperClasses(superType1); + StrongConcurrentSet> types2 = this.utils.getSuperClasses(superType2); - StrongConcurrentSetV8 subs; - HashMapTree, StrongConcurrentSetV8> leaf1; - HashMapTree, StrongConcurrentSetV8> leaf2; + StrongConcurrentSet subs; + HashMapTree, StrongConcurrentSet> leaf1; + HashMapTree, StrongConcurrentSet> leaf2; ISetEntry current = null; Subscription sub; @@ -673,12 +674,12 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { - HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; + public StrongConcurrentSet getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { + HashMapTree, StrongConcurrentSet> local = this.superClassSubscriptionsMulti; // whenever our subscriptions change, this map is cleared. - HashMapTree, StrongConcurrentSetV8> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); - StrongConcurrentSetV8 subsPerType; + HashMapTree, StrongConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); + StrongConcurrentSet subsPerType; // we DO NOT care about duplicate, because the answers will be the same @@ -690,19 +691,19 @@ public class SubscriptionManager { subsPerType = subHolderSingle.get(); // cache our subscriptions for super classes, so that their access can be fast! - StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); + StrongConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); if (putIfAbsent == null) { // we are the first one in the map subHolderSingle.set(subHolderSingle.initialValue()); - StrongConcurrentSetV8> types1 = this.utils.getSuperClasses(superType1); - StrongConcurrentSetV8> types2 = this.utils.getSuperClasses(superType2); - StrongConcurrentSetV8> types3 = this.utils.getSuperClasses(superType3); + StrongConcurrentSet> types1 = this.utils.getSuperClasses(superType1); + StrongConcurrentSet> types2 = this.utils.getSuperClasses(superType2); + StrongConcurrentSet> types3 = this.utils.getSuperClasses(superType3); - StrongConcurrentSetV8 subs; - HashMapTree, StrongConcurrentSetV8> leaf1; - HashMapTree, StrongConcurrentSetV8> leaf2; - HashMapTree, StrongConcurrentSetV8> leaf3; + StrongConcurrentSet subs; + HashMapTree, StrongConcurrentSet> leaf1; + HashMapTree, StrongConcurrentSet> leaf2; + HashMapTree, StrongConcurrentSet> leaf3; ISetEntry current = null; Subscription sub; diff --git a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java index 0d42829..fb1e5c6 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java @@ -17,7 +17,7 @@ import dorkbox.util.messagebus.annotations.Handler; public class ReflectionUtils { public static StrongConcurrentSetV8 getMethods(Class target) { - StrongConcurrentSetV8 hashSet = new StrongConcurrentSetV8(16, .8F, 1); + StrongConcurrentSetV8 hashSet = new StrongConcurrentSetV8(16, .8F); getMethods(target, hashSet); return hashSet; } @@ -67,7 +67,7 @@ public class ReflectionUtils { * @return A set of classes, each representing a super type of the root class */ public static StrongConcurrentSetV8> getSuperTypes(Class from) { - StrongConcurrentSetV8> superclasses = new StrongConcurrentSetV8>(8, 0.8F, 1); + StrongConcurrentSetV8> superclasses = new StrongConcurrentSetV8>(8, 0.8F); collectInterfaces( from, superclasses ); @@ -131,7 +131,7 @@ public class ReflectionUtils { } public static A getAnnotation( AnnotatedElement from, Class annotationType) { - A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8(16, .8F, 1)); + A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8(16, .8F)); return annotation; } diff --git a/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java b/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java index 6c573a6..0a0f3bc 100644 --- a/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java +++ b/src/main/java/dorkbox/util/messagebus/common/StrongConcurrentSetV8.java @@ -9,7 +9,8 @@ package dorkbox.util.messagebus.common; */ public class StrongConcurrentSetV8 extends StrongConcurrentSet { - public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) { - super(new ConcurrentHashMapV8>(size, loadFactor, stripeSize)); + public StrongConcurrentSetV8(int size, float loadFactor) { + // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks) + super(new ConcurrentHashMapV8>(size, loadFactor, 1)); } } \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java index 2dbf7c8..30df869 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java @@ -11,7 +11,7 @@ public class SubscriptionUtils { private final Map, Class> arrayVersionCache; private final Map, Boolean> isArrayCache; - private final ConcurrentMap, StrongConcurrentSetV8>> superClassesCache; + private final ConcurrentMap, StrongConcurrentSet>> superClassesCache; private final ClassHolder classHolderSingle; @@ -19,8 +19,8 @@ public class SubscriptionUtils { this.arrayVersionCache = new ConcurrentHashMapV8, Class>(64, loadFactor, stripeSize); this.isArrayCache = new ConcurrentHashMapV8, Boolean>(64, loadFactor, stripeSize); - this.superClassesCache = new ConcurrentHashMapV8, StrongConcurrentSetV8>>(64, loadFactor, stripeSize); - this.classHolderSingle = new ClassHolder(loadFactor, stripeSize); + this.superClassesCache = new ConcurrentHashMapV8, StrongConcurrentSet>>(64, loadFactor, stripeSize); + this.classHolderSingle = new ClassHolder(loadFactor); } /** @@ -28,24 +28,24 @@ public class SubscriptionUtils { * never returns null * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) */ - public StrongConcurrentSetV8> getSuperClasses(Class clazz) { + public StrongConcurrentSet> getSuperClasses(Class clazz) { return getSuperClasses(clazz, isArray(clazz)); } - public final StrongConcurrentSetV8> getSuperClasses(Class clazz, boolean isArray) { + public final StrongConcurrentSet> getSuperClasses(Class clazz, boolean isArray) { // this is never reset, since it never needs to be. - ConcurrentMap, StrongConcurrentSetV8>> local = this.superClassesCache; + ConcurrentMap, StrongConcurrentSet>> local = this.superClassesCache; ClassHolder classHolderSingle = this.classHolderSingle; - StrongConcurrentSetV8> classes = classHolderSingle.get(); + StrongConcurrentSet> classes = classHolderSingle.get(); - StrongConcurrentSetV8> putIfAbsent = local.putIfAbsent(clazz, classes); + StrongConcurrentSet> putIfAbsent = local.putIfAbsent(clazz, classes); if (putIfAbsent == null) { // we are the first one in the map classHolderSingle.set(classHolderSingle.initialValue()); // it doesn't matter if concurrent access stomps on values, since they are always the same. - StrongConcurrentSetV8> superTypes = ReflectionUtils.getSuperTypes(clazz); + StrongConcurrentSet> superTypes = ReflectionUtils.getSuperTypes(clazz); ISetEntry> current = superTypes.head; Class c; diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java index e116d3b..ae192e5 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java @@ -5,19 +5,17 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8; public class ClassHolder extends ThreadLocal>> { - private final int stripeSize; private final float loadFactor; - public ClassHolder(float loadFactor, int stripeSize) { + public ClassHolder(float loadFactor) { super(); - this.stripeSize = stripeSize; this.loadFactor = loadFactor; } @Override public StrongConcurrentSetV8> initialValue() { - return new StrongConcurrentSetV8>(16, this.loadFactor, this.stripeSize); + return new StrongConcurrentSetV8>(16, this.loadFactor); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java index ae9a47a..c3a9136 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java @@ -5,19 +5,17 @@ import dorkbox.util.messagebus.subscription.Subscription; public class SubscriptionHolder extends ThreadLocal> { - private final int stripeSize; private final float loadFactor; - public SubscriptionHolder(float loadFactor, int stripeSize) { + public SubscriptionHolder(float loadFactor) { super(); - this.stripeSize = stripeSize; this.loadFactor = loadFactor; } @Override public StrongConcurrentSetV8 initialValue() { - return new StrongConcurrentSetV8(16, this.loadFactor, this.stripeSize); + return new StrongConcurrentSetV8(16, this.loadFactor); } } diff --git a/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java b/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java index 52763de..2b99949 100644 --- a/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java +++ b/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java @@ -22,7 +22,7 @@ public class MessageListener { private Class listenerDefinition; public MessageListener(Class listenerDefinition, int size) { - this.handlers = new StrongConcurrentSetV8(size, 0.8F, 1); + this.handlers = new StrongConcurrentSetV8(size, 0.8F); this.listenerDefinition = listenerDefinition; } diff --git a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java b/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java index 8e38705..6165138 100644 --- a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java +++ b/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java @@ -25,7 +25,7 @@ public class MetadataReader { StrongConcurrentSetV8 allHandlers = ReflectionUtils.getMethods(target); // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) - StrongConcurrentSetV8 bottomMostHandlers = new StrongConcurrentSetV8(allHandlers.size(), 0.8F, 1); + StrongConcurrentSetV8 bottomMostHandlers = new StrongConcurrentSetV8(allHandlers.size(), 0.8F); ISetEntry current = allHandlers.head; diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java new file mode 100644 index 0000000..7c145eb --- /dev/null +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java @@ -0,0 +1,207 @@ +package dorkbox.util.messagebus; + +import java.lang.ref.WeakReference; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedTransferQueue; + +import dorkbox.util.messagebus.annotations.Handler; +import dorkbox.util.messagebus.common.ConcurrentHashMapV8; +import dorkbox.util.messagebus.common.StrongConcurrentSet; +import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import dorkbox.util.messagebus.listener.MessageHandler; +import dorkbox.util.messagebus.listener.MessageListener; +import dorkbox.util.messagebus.listener.MetadataReader; +import dorkbox.util.messagebus.subscription.Subscription; + + +public class PerfTest_Collections { + public static final int REPETITIONS = 10 * 1000 * 100; + public static final Integer TEST_VALUE = Integer.valueOf(777); + + private static final float LOAD_FACTOR = 0.8F; + private static MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class); + + public static void main(final String[] args) throws Exception { + final int size = 16; + + System.out.println("reps:" + REPETITIONS + " size: " + size); + + // have to warm-up the JVM. + System.err.print("\nWarming up JVM."); +// for (int i=0;i<2;i++) { + bench(size, new ConcurrentLinkedQueue(), false); + System.err.print("."); + bench(size, new ArrayList(size*2), false); + System.err.print("."); + bench(size, new ArrayDeque(size*2), false); + System.err.print("."); + bench(size, new ConcurrentLinkedQueue(), false); + System.err.print("."); + bench(size, new LinkedList(), false); + System.err.print("."); +// } + System.err.println("Done"); + + bench(size, new ArrayList(size*2)); + bench(size, new ConcurrentLinkedQueue()); + bench(size, new LinkedTransferQueue()); + bench(size, new ArrayDeque(size*2)); + bench(size, new ConcurrentLinkedQueue()); + bench(size, new LinkedList()); + bench(size, new StrongConcurrentSetV8(size*2, LOAD_FACTOR)); + bench(size, Collections.newSetFromMap(new ConcurrentHashMapV8(size*2, LOAD_FACTOR, 1))); + bench(size, new StrongConcurrentSet(size*2, LOAD_FACTOR)); + bench(size, new HashSet()); +// bench(size, new ConcurrentSkipListSet()); // needs comparable + } + + public static void bench(final int size, Collection set) throws Exception { + bench(size, set, true); + } + public static void bench(final int size, Collection set, boolean showOutput) throws Exception { + final int warmupRuns = 2; + final int runs = 3; + + for (int i=0;i set, boolean showStats, int concurrency, int repetitions) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { + WeakReference weakReference = new WeakReference<>(new Object()); + while (weakReference.get() != null) { + System.gc(); + Thread.sleep(100L); + } + results[i] = performanceRun(i, set, showStats, concurrency, repetitions); + } + // only average last X results for summary + long sum = 0; + for (int i = warmUpRuns; i < runs; i++) { + sum += results[i]; + } + + return sum/sumCount; + } + + private static long performanceRun(int runNumber, Collection set, boolean showStats, int concurrency, int repetitions) throws Exception { + + Producer[] producers = new Producer[concurrency]; + Thread[] threads = new Thread[concurrency*2]; + + for (int i=0;i end) { + end = producers[i].end; + } + + count += producers[i].count; + } + + + long duration = end - start; + long ops = repetitions * 1_000_000_000L / duration; + + if (showStats) { + System.out.format("%d (%d) - ops/sec=%,d\n", runNumber, count, ops); + } + return ops; + } + + public static class Producer implements Runnable { + private final Collection set; + volatile long start; + volatile long end; + private int repetitions; + volatile int count; + + public Producer(Collection set, int repetitions) { + this.set = set; + this.repetitions = repetitions; + } + + @SuppressWarnings("unused") + @Override + public void run() { + Collection set = this.set; + int i = this.repetitions; + this.start = System.nanoTime(); + +// Entry current; +// Subscription sub; + int count = 0; + + do { + for (Subscription sub : set) { + // if (sub.acceptsSubtypes()) { +// count--; +// } else { + count++; +// } + } + +// current = set.head; +// while (current != null) { +// sub = current.getValue(); +// current = current.next(); +// +//// count++; +// } + } while (0 != --i); + + this.end = System.nanoTime(); + this.count = count; + } + } + + @SuppressWarnings("unused") + public static class Listener { + @Handler + public void handleSync(Integer o1) { + } + + @Handler(acceptVarargs=true) + public void handleSync(Object... o) { + } + } +}