- * Parameter length checking during publication is performed, so that you can have multiple handlers with the same signature, but each
- * with a different number of parameters
- */
-public class FirstArgSubscriber implements Subscriber {
-
- private final ErrorHandlingSupport errorHandler;
-
- private final SubscriptionUtils subUtils;
-
- // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
- // this is the primary list for dispatching a specific message
- // write access is synchronized and happens only when a listener of a specific class is registered the first time
-
- // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
- private final Map, ArrayList> subscriptionsPerMessage;
-
-
- public FirstArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
- this.errorHandler = errorHandler;
-
- // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
- this.subscriptionsPerMessage = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
-
- this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR);
- }
-
- // inside a write lock
- // add this subscription to each of the handled types
- // to activate this sub for publication
- @Override
- public void register(final Class> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
-
- final Map, ArrayList> subscriptions = this.subscriptionsPerMessage;
-
- Subscription subscription;
- MessageHandler handler;
- Class>[] messageHandlerTypes;
- int size;
-
- Class> type0;
- ArrayList subs;
-
- for (int i = 0; i < handlersSize; i++) {
- subscription = subsPerListener[i];
-
- // activate this subscription for publication
- // now add this subscription to each of the handled types
-
- // only register based on the FIRST parameter
- handler = subscription.getHandler();
- messageHandlerTypes = handler.getHandledMessages();
- size = messageHandlerTypes.length;
-
- if (size == 0) {
- errorHandler.handleError("Error while trying to subscribe class: " + messageHandlerTypes.getClass(), listenerClass);
- continue;
- }
-
- type0 = messageHandlerTypes[0];
- subs = subscriptions.get(type0);
- if (subs == null) {
- subs = new ArrayList();
-
- subscriptions.put(type0, subs);
- }
-
- subs.add(subscription);
- }
- }
-
- @Override
- public AtomicBoolean getVarArgPossibility() {
- return null;
- }
-
- @Override
- public VarArgUtils getVarArgUtils() {
- return null;
- }
-
- @Override
- public void shutdown() {
- this.subscriptionsPerMessage.clear();
- }
-
- @Override
- public void clear() {
-
- }
-
- // can return null
- @Override
- public ArrayList getExactAsArray(final Class> messageClass) {
- return subscriptionsPerMessage.get(messageClass);
- }
-
- // can return null
- @Override
- public ArrayList getExactAsArray(final Class> messageClass1, final Class> messageClass2) {
- return subscriptionsPerMessage.get(messageClass1);
- }
-
- // can return null
- @Override
- public ArrayList getExactAsArray(final Class> messageClass1, final Class> messageClass2,
- final Class> messageClass3) {
- return subscriptionsPerMessage.get(messageClass1);
- }
-
- // can return null
- @Override
- public
- Subscription[] getExact(final Class> messageClass) {
- final ArrayList collection = getExactAsArray(messageClass);
-
- if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
-
- return subscriptions;
- }
-
- return null;
- }
-
- // can return null
- @Override
- public Subscription[] getExact(final Class> messageClass1, final Class> messageClass2) {
- return null;
- }
-
- // can return null
- @Override
- public Subscription[] getExact(final Class> messageClass1, final Class> messageClass2, final Class> messageClass3) {
- return null;
- }
-
- // can return null
- @Override
- public Subscription[] getExactAndSuper(final Class> messageClass) {
- ArrayList collection = getExactAsArray(messageClass); // can return null
-
- // now publish superClasses
- final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
-
- if (collection != null) {
- collection = new ArrayList(collection);
-
- if (!superSubscriptions.isEmpty()) {
- collection.addAll(superSubscriptions);
- }
- }
- else if (!superSubscriptions.isEmpty()) {
- collection = superSubscriptions;
- }
-
- if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
- return subscriptions;
- }
- else {
- return null;
- }
- }
-
- @Override
- public Subscription[] getExactAndSuper(final Class> messageClass1, final Class> messageClass2) {
- ArrayList collection = getExactAsArray(messageClass1); // can return null
-
- // now publish superClasses
- final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null
-
- if (collection != null) {
- collection = new ArrayList(collection);
-
- if (!superSubscriptions.isEmpty()) {
- collection.addAll(superSubscriptions);
- }
- }
- else if (!superSubscriptions.isEmpty()) {
- collection = superSubscriptions;
- }
-
- if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
- return subscriptions;
- }
- else {
- return null;
- }
- }
-
- @Override
- public Subscription[] getExactAndSuper(final Class> messageClass1, final Class> messageClass2, final Class> messageClass3) {
- ArrayList collection = getExactAsArray(messageClass1); // can return null
-
- // now publish superClasses
- final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null
-
- if (collection != null) {
- collection = new ArrayList(collection);
-
- if (!superSubscriptions.isEmpty()) {
- collection.addAll(superSubscriptions);
- }
- }
- else if (!superSubscriptions.isEmpty()) {
- collection = superSubscriptions;
- }
-
- if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
- return subscriptions;
- }
- else {
- return null;
- }
- }
-}
diff --git a/src/dorkbox/util/messagebus/subscription/Subscriber.java b/src/dorkbox/util/messagebus/subscription/Subscriber.java
index 4687ada..2eb8c65 100644
--- a/src/dorkbox/util/messagebus/subscription/Subscriber.java
+++ b/src/dorkbox/util/messagebus/subscription/Subscriber.java
@@ -25,11 +25,13 @@ import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
+@SuppressWarnings("Duplicates")
public
class Subscriber {
public static final float LOAD_FACTOR = 0.8F;
@@ -42,13 +44,13 @@ class Subscriber {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
- private final Map, ArrayList> subscriptionsPerMessageSingle;
+ final ConcurrentMap, ArrayList> subscriptionsPerMessageSingle;
private final HashMapTree, ArrayList> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
- private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
+ final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
- private ThreadLocal> listCache = new ThreadLocal>() {
+ ThreadLocal> listCache = new ThreadLocal>() {
@Override
protected
ArrayList initialValue() {
@@ -63,7 +65,7 @@ class Subscriber {
this.errorHandler = errorHandler;
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
- this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR);
+ this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>();
this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR);
@@ -160,23 +162,6 @@ class Subscriber {
}
}
- public
- void register(final Class> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
-
- final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
- final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
- final AtomicBoolean varArgPossibility = this.varArgPossibility;
-
- Subscription subscription;
-
- for (int i = 0; i < handlersSize; i++) {
- subscription = subsPerListener[i];
-
- // activate this subscription for publication
- // now add this subscription to each of the handled types
- registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
- }
- }
public
void shutdown() {
@@ -207,6 +192,7 @@ class Subscriber {
final ArrayList collection = getExactAsArray(messageClass);
if (collection != null) {
+ // convert to Array because the subscriptions can change and we want safe iteration over the list
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java
index b762abb..c98e21b 100644
--- a/src/dorkbox/util/messagebus/subscription/Subscription.java
+++ b/src/dorkbox/util/messagebus/subscription/Subscription.java
@@ -71,15 +71,17 @@ class Subscription {
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
- private final MessageHandler handlerMetadata;
+ private final MessageHandler handler;
private final IHandlerInvocation invocation;
private final Collection listeners;
public
- Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) {
- this.handlerMetadata = handler;
-// this.listeners = new StrongConcurrentSetV8(16, loadFactor, stripeSize);
+ Subscription(final MessageHandler handler) {
+ this.handler = handler;
+
+// this.listeners = Collections.newSetFromMap(new ConcurrentHashMap(8)); // really bad performance
+// this.listeners = new StrongConcurrentSetV8(16, 0.7F, 8);
///this is by far, the fastest
this.listeners = new ConcurrentSkipListSet<>(new Comparator() {
@@ -87,6 +89,7 @@ class Subscription {
public
int compare(final Object o1, final Object o2) {
return Integer.compare(o1.hashCode(), o2.hashCode());
+// return 0;
}
});
// this.listeners = new StrongConcurrentSet(16, 0.85F);
@@ -104,7 +107,7 @@ class Subscription {
public
MessageHandler getHandler() {
- return handlerMetadata;
+ return handler;
}
public
@@ -133,8 +136,8 @@ class Subscription {
public
void publish(final Object message) throws Throwable {
- final MethodAccess handler = this.handlerMetadata.getHandler();
- final int handleIndex = this.handlerMetadata.getMethodIndex();
+ final MethodAccess handler = this.handler.getHandler();
+ final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator iterator;
@@ -149,8 +152,8 @@ class Subscription {
public
void publish(final Object message1, final Object message2) throws Throwable {
- final MethodAccess handler = this.handlerMetadata.getHandler();
- final int handleIndex = this.handlerMetadata.getMethodIndex();
+ final MethodAccess handler = this.handler.getHandler();
+ final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator iterator;
@@ -165,8 +168,8 @@ class Subscription {
public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
- final MethodAccess handler = this.handlerMetadata.getHandler();
- final int handleIndex = this.handlerMetadata.getMethodIndex();
+ final MethodAccess handler = this.handler.getHandler();
+ final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator iterator;
@@ -181,8 +184,8 @@ class Subscription {
public
void publishToSubscription(final Object... messages) throws Throwable {
- final MethodAccess handler = this.handlerMetadata.getHandler();
- final int handleIndex = this.handlerMetadata.getMethodIndex();
+ final MethodAccess handler = this.handler.getHandler();
+ final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator iterator;
diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
index caafe6a..7a18b41 100644
--- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
+++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
@@ -17,9 +17,11 @@ package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
-import dorkbox.util.messagebus.common.adapter.StampedLock;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
@@ -39,19 +41,15 @@ class SubscriptionManager {
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
- private final Map, Subscription[]> subscriptionsPerListener;
+ private final ConcurrentMap, Subscription[]> subscriptionsPerListener;
- private final StampedLock lock;
- private final int numberOfThreads;
private final Subscriber subscriber;
public
- SubscriptionManager(final int numberOfThreads, final Subscriber subscriber, final StampedLock lock) {
- this.numberOfThreads = numberOfThreads;
+ SubscriptionManager(final int numberOfThreads, final Subscriber subscriber) {
this.subscriber = subscriber;
- this.lock = lock;
// modified ONLY during SUB/UNSUB
@@ -85,11 +83,13 @@ class SubscriptionManager {
// these are concurrent collections
subscriber.clear();
- Subscription[] subscriptions = getListenerSubs(listenerClass);
+ // this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist.
+ // ONCE subscriptions are in THIS map, they are considered AVAILABLE.
+ Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
// the subscriptions from the map were null, so create them
if (subscriptions == null) {
- // it is important to note that this section CAN be repeated, however the write lock is gained before
+ // it is important to note that this section CAN be repeated.
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
@@ -101,50 +101,80 @@ class SubscriptionManager {
return;
}
- final Subscription[] subsPerListener = new Subscription[handlersSize];
- // create the subscription
- MessageHandler messageHandler;
+ final AtomicBoolean varArgPossibility = subscriber.varArgPossibility;
Subscription subscription;
+ MessageHandler messageHandler;
+ Class>[] messageHandlerTypes;
+ Class> handlerType;
+
+ // create the subscriptions
+ final ConcurrentMap, ArrayList> subsPerMessageSingle = subscriber.subscriptionsPerMessageSingle;
+ subscriptions = new Subscription[handlersSize];
+
for (int i = 0; i < handlersSize; i++) {
+ // THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
messageHandler = messageHandlers[i];
- // create the subscription
- subscription = new Subscription(messageHandler, Subscriber.LOAD_FACTOR, numberOfThreads);
- subscription.subscribe(listener);
+ // is this handler able to accept var args?
+ if (messageHandler.getVarArgClass() != null) {
+ varArgPossibility.lazySet(true);
+ }
- subsPerListener[i] = subscription; // activates this sub for sub/unsub
+ // now create a list of subscriptions for this specific handlerType (but don't add anything yet).
+ // we only store things based on the FIRST type (for lookup) then parse the rest of the types during publication
+ messageHandlerTypes = messageHandler.getHandledMessages();
+ handlerType = messageHandlerTypes[0];
+
+ // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
+ final ArrayList cachedSubs = subscriber.listCache.get();
+ ArrayList subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs);
+ if (subs == null) {
+ subscriber.listCache.set(new ArrayList(8));
+ }
+
+ // create the subscription. This can be thrown away if the subscription succeeds in another thread
+ subscription = new Subscription(messageHandler);
+ subscriptions[i] = subscription;
+
+ // now add this subscription to each of the handled types
}
- final Map, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
+ // now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
- // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
- // of the huge number of reads compared to writes.
+ // putIfAbsent
+ final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub
+ if (previousSubs != null) {
+ // another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure
+ // all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUT the correct list again.
+ // This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle.
- final StampedLock lock = this.lock;
- final long stamp = lock.writeLock();
+ // since nothing is yet "subscribed" we can assign the correct values for everything now
+ subscriptions = previousSubs;
+ } else {
+ // we can now safely add for publication AND subscribe since the data structures are consistent
+ for (int i = 0; i < handlersSize; i++) {
+ subscription = subscriptions[i];
+ subscription.subscribe(listener); // register this callback listener to this subscription
- subscriptions = subsPerListenerMap.get(listenerClass);
+ // THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
+ messageHandler = messageHandlers[i];
- // it was still null, so we actually have to create the rest of the subs
- if (subscriptions == null) {
- subscriber.register(listenerClass, handlersSize, subsPerListener); // this adds to subscriptionsPerMessage
+ // register for publication
+ messageHandlerTypes = messageHandler.getHandledMessages();
+ handlerType = messageHandlerTypes[0];
- subsPerListenerMap.put(listenerClass, subsPerListener);
- lock.unlockWrite(stamp);
+ // makes this subscription visible for publication
+ subsPerMessageSingle.get(handlerType).add(subscription);
+ }
return;
}
- else {
- // continue to subscription
- lock.unlockWrite(stamp);
- }
}
// subscriptions already exist and must only be updated
- // only publish here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
@@ -167,7 +197,7 @@ class SubscriptionManager {
// these are concurrent collections
subscriber.clear();
- final Subscription[] subscriptions = getListenerSubs(listenerClass);
+ final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
Subscription subscription;
@@ -177,16 +207,4 @@ class SubscriptionManager {
}
}
}
-
- private
- Subscription[] getListenerSubs(final Class> listenerClass) {
-
- final StampedLock lock = this.lock;
- final long stamp = lock.readLock();
-
- final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
-
- lock.unlockRead(stamp);
- return subscriptions;
- }
}
diff --git a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java
index 9dd212e..8cf2bca 100644
--- a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java
+++ b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java
@@ -42,7 +42,7 @@ class SubscriptionUtils {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, 1);
- this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor);
+ this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>();
}
public
diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
index 0254bfa..6960a7b 100644
--- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java
+++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
@@ -41,10 +41,10 @@ class VarArgUtils {
this.superClassUtils = superClassUtils;
this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
- this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor);
+ this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>();
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
- this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor);
+ this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>();
}
diff --git a/test/dorkbox/util/messagebus/ObjectTreeTest.java b/test/dorkbox/util/messagebus/ObjectTreeTest.java
index 5642d3a..c9d36e0 100644
--- a/test/dorkbox/util/messagebus/ObjectTreeTest.java
+++ b/test/dorkbox/util/messagebus/ObjectTreeTest.java
@@ -43,7 +43,7 @@ public class ObjectTreeTest extends AssertSupport {
@Test
public void testObjectTree() {
- HashMapTree, String> tree = new HashMapTree, String>(8, 0.8F);
+ HashMapTree, String> tree = new HashMapTree, String>();
test(tree, "s", String.class);
test(tree, "x", String.class);
diff --git a/test/dorkbox/util/messagebus/PerfTest_Collections.java b/test/dorkbox/util/messagebus/PerfTest_Collections.java
index 07c2424..3f7a601 100644
--- a/test/dorkbox/util/messagebus/PerfTest_Collections.java
+++ b/test/dorkbox/util/messagebus/PerfTest_Collections.java
@@ -89,7 +89,7 @@ class PerfTest_Collections {
for (int i = 0; i < size; i++) {
for (MessageHandler x : allHandlers) {
- set.add(new Subscription(x, .85F, 1));
+ set.add(new Subscription(x));
}
}
diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
index 248b386..5ed422a 100644
--- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
+++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
@@ -161,7 +161,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
- SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
+ SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles(
@@ -185,7 +185,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
- final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
+ final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java
deleted file mode 100644
index 39b1ae3..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class PerfTest_LinkedBlockingQueue {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
-
- public static void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS);
-
- final int warmupRuns = 4;
- final int runs = 3;
-
- for (int concurrency = 1; concurrency < 5; concurrency++) {
- final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
- long average = PerfTest_LinkedBlockingQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
- System.out.format("PerfTest_LinkedBlockingQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
- }
-
- for (int concurrency = 1; concurrency < 5; concurrency++) {
- final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
- long average = PerfTest_LinkedBlockingQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
- System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
- }
- }
-}
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java
deleted file mode 100644
index 45a4f2d..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-public
-class PerfTest_LinkedBlockingQueue_Block {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
- private static final int concurrency = 4;
-
- public static
- void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
-
- final int warmupRuns = 4;
- final int runs = 5;
-
- final LinkedBlockingQueue queue = new LinkedBlockingQueue(concurrency);
- long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
-
- System.out.format("summary,QueuePerfTest,%s %,d\n",
- queue.getClass()
- .getSimpleName(),
- average);
- }
-
- public static
- long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions)
- throws Exception {
- int runs = warmUpRuns + sumCount;
- final long[] results = new long[runs];
- for (int i = 0; i < runs; i++) {
- System.gc();
- results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
- }
- // only average last X results for summary
- long sum = 0;
- for (int i = warmUpRuns; i < runs; i++) {
- sum += results[i];
- }
-
- return sum / sumCount;
- }
-
- private static
- long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions)
- throws Exception {
-
- Producer[] producers = new Producer[concurrency];
- Consumer[] consumers = new Consumer[concurrency];
- Thread[] threads = new Thread[concurrency * 2];
-
- for (int i = 0; i < concurrency; i++) {
- producers[i] = new Producer(queue, repetitions);
- consumers[i] = new Consumer(queue, repetitions);
- }
-
- for (int j = 0, i = 0; i < concurrency; i++, j += 2) {
- threads[j] = new Thread(producers[i], "Producer " + i);
- threads[j + 1] = new Thread(consumers[i], "Consumer " + i);
- }
-
- for (int i = 0; i < concurrency * 2; i += 2) {
- threads[i].start();
- threads[i + 1].start();
- }
-
- for (int i = 0; i < concurrency * 2; i += 2) {
- threads[i].join();
- threads[i + 1].join();
- }
-
- long start = Long.MAX_VALUE;
- long end = -1;
-
- for (int i = 0; i < concurrency; i++) {
- if (producers[i].start < start) {
- start = producers[i].start;
- }
-
- if (consumers[i].end > end) {
- end = consumers[i].end;
- }
- }
-
-
- long duration = end - start;
- long ops = repetitions * 1_000_000_000L / duration;
- String qName = queue.getClass()
- .getSimpleName();
-
- if (showStats) {
- System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
- }
- return ops;
- }
-
- public static
- class Producer implements Runnable {
- private final LinkedBlockingQueue queue;
- volatile long start;
- private int repetitions;
-
- public
- Producer(LinkedBlockingQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public
- void run() {
- LinkedBlockingQueue producer = this.queue;
- int i = this.repetitions;
- this.start = System.nanoTime();
-
- try {
- do {
- producer.put(TEST_VALUE);
- } while (0 != --i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
-
- public static
- class Consumer implements Runnable {
- private final LinkedBlockingQueue queue;
- Object result;
- volatile long end;
- private int repetitions;
-
- public
- Consumer(LinkedBlockingQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public
- void run() {
- LinkedBlockingQueue consumer = this.queue;
- Object result = null;
- int i = this.repetitions;
-
- try {
- do {
- result = consumer.take();
- } while (0 != --i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- this.result = result;
- this.end = System.nanoTime();
- }
- }
-}
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java
deleted file mode 100644
index 151976d..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class PerfTest_LinkedBlockingQueue_NonBlock {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
- public static final int QUEUE_CAPACITY = 1 << 17;
-
- private static final int concurrency = 4;
-
- public static void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
-
- final int warmupRuns = 5;
- final int runs = 5;
-
- long average = 0;
-
- final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
- average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
-
- System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
- }
-
- public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
- int runs = warmUpRuns + sumCount;
- final long[] results = new long[runs];
- for (int i = 0; i < runs; i++) {
- System.gc();
- results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
- }
- // only average last X results for summary
- long sum = 0;
- for (int i = warmUpRuns; i < runs; i++) {
- sum += results[i];
- }
-
- return sum/sumCount;
- }
-
- private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
-
- Producer[] producers = new Producer[concurrency];
- Consumer[] consumers = new Consumer[concurrency];
- Thread[] threads = new Thread[concurrency*2];
-
- for (int i=0;i end) {
- end = consumers[i].end;
- }
- }
-
-
- long duration = end - start;
- long ops = repetitions * 1_000_000_000L / duration;
- String qName = queue.getClass().getSimpleName();
-
- if (showStats) {
- System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
- }
- return ops;
- }
-
- private
- PerfTest_LinkedBlockingQueue_NonBlock() {
- }
-
- public static class Producer implements Runnable {
- private final LinkedBlockingQueue queue;
- volatile long start;
- private int repetitions;
-
- public Producer(LinkedBlockingQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedBlockingQueue producer = this.queue;
- int i = this.repetitions;
- this.start = System.nanoTime();
-
- do {
- while (!producer.offer(TEST_VALUE)) {
- Thread.yield();
- }
- } while (0 != --i);
- }
- }
-
- public static class Consumer implements Runnable {
- private final LinkedBlockingQueue queue;
- Object result;
- volatile long end;
- private int repetitions;
-
- public Consumer(LinkedBlockingQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedBlockingQueue consumer = this.queue;
- Object result = null;
- int i = this.repetitions;
-
- do {
- while (null == (result = consumer.poll())) {
- Thread.yield();
- }
- } while (0 != --i);
-
- this.result = result;
- this.end = System.nanoTime();
- }
- }
-}
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java
deleted file mode 100644
index 1167f2a..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedTransferQueue;
-
-public class PerfTest_LinkedTransferQueue {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
-
- public static void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS);
-
- final int warmupRuns = 4;
- final int runs = 3;
-
- for (int concurrency = 1; concurrency < 5; concurrency++) {
- final LinkedTransferQueue queue = new LinkedTransferQueue();
- long average = PerfTest_LinkedTransferQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
- System.out.format("PerfTest_LinkedTransferQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
- }
-
- for (int concurrency = 1; concurrency < 5; concurrency++) {
- final LinkedTransferQueue queue = new LinkedTransferQueue();
- long average = PerfTest_LinkedTransferQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
- System.out.format("PerfTest_LinkedTransferQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
- }
- }
-}
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java
deleted file mode 100644
index c69ea68..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedTransferQueue;
-
-public class PerfTest_LinkedTransferQueue_Block {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
- private static final int concurrency = 4;
-
- public static void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
-
- final int warmupRuns = 4;
- final int runs = 5;
-
- final LinkedTransferQueue queue = new LinkedTransferQueue();
- long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
-
-
- System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
- }
-
- public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
- int runs = warmUpRuns + sumCount;
- final long[] results = new long[runs];
- for (int i = 0; i < runs; i++) {
- System.gc();
- results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
- }
- // only average last X results for summary
- long sum = 0;
- for (int i = warmUpRuns; i < runs; i++) {
- sum += results[i];
- }
-
- return sum/sumCount;
- }
-
- private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
-
- Producer[] producers = new Producer[concurrency];
- Consumer[] consumers = new Consumer[concurrency];
- Thread[] threads = new Thread[concurrency*2];
-
- for (int i=0;i end) {
- end = consumers[i].end;
- }
- }
-
-
- long duration = end - start;
- long ops = repetitions * 1_000_000_000L / duration;
- String qName = queue.getClass().getSimpleName();
-
- if (showStats) {
- System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
- }
- return ops;
- }
-
- public static class Producer implements Runnable {
- private final LinkedTransferQueue queue;
- volatile long start;
- private int repetitions;
-
- public Producer(LinkedTransferQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedTransferQueue producer = this.queue;
- int i = this.repetitions;
- this.start = System.nanoTime();
-
- try {
- do {
- producer.transfer(TEST_VALUE);
- } while (0 != --i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static class Consumer implements Runnable {
- private final LinkedTransferQueue queue;
- Object result;
- volatile long end;
- private int repetitions;
-
- public Consumer(LinkedTransferQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedTransferQueue consumer = this.queue;
- Object result = null;
- int i = this.repetitions;
-
- try {
- do {
- result = consumer.take();
- } while (0 != --i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- this.result = result;
- this.end = System.nanoTime();
- }
- }
-}
diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java
deleted file mode 100644
index 36dddbb..0000000
--- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package dorkbox.util.messagebus.queuePerf;
-
-import java.util.concurrent.LinkedTransferQueue;
-
-public class PerfTest_LinkedTransferQueue_NonBlock {
- public static final int REPETITIONS = 50 * 1000 * 100;
- public static final Integer TEST_VALUE = Integer.valueOf(777);
-
- public static final int QUEUE_CAPACITY = 1 << 17;
-
- private static final int concurrency = 4;
-
- public static void main(final String[] args) throws Exception {
- System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
-
- final int warmupRuns = 5;
- final int runs = 5;
-
- long average = 0;
-
- final LinkedTransferQueue queue = new LinkedTransferQueue();
- average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
-
- System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
- }
-
- public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
- int runs = warmUpRuns + sumCount;
- final long[] results = new long[runs];
- for (int i = 0; i < runs; i++) {
- System.gc();
- results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
- }
- // only average last X results for summary
- long sum = 0;
- for (int i = warmUpRuns; i < runs; i++) {
- sum += results[i];
- }
-
- return sum/sumCount;
- }
-
- private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
-
- Producer[] producers = new Producer[concurrency];
- Consumer[] consumers = new Consumer[concurrency];
- Thread[] threads = new Thread[concurrency*2];
-
- for (int i=0;i end) {
- end = consumers[i].end;
- }
- }
-
-
- long duration = end - start;
- long ops = repetitions * 1_000_000_000L / duration;
- String qName = queue.getClass().getSimpleName();
-
- if (showStats) {
- System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
- }
- return ops;
- }
-
- public static class Producer implements Runnable {
- private final LinkedTransferQueue queue;
- volatile long start;
- private int repetitions;
-
- public Producer(LinkedTransferQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedTransferQueue producer = this.queue;
- int i = this.repetitions;
- this.start = System.nanoTime();
-
- do {
- while (!producer.offer(TEST_VALUE)) {
- Thread.yield();
- }
- } while (0 != --i);
- }
- }
-
- public static class Consumer implements Runnable {
- private final LinkedTransferQueue queue;
- Object result;
- volatile long end;
- private int repetitions;
-
- public Consumer(LinkedTransferQueue queue, int repetitions) {
- this.queue = queue;
- this.repetitions = repetitions;
- }
-
- @Override
- public void run() {
- LinkedTransferQueue consumer = this.queue;
- Object result = null;
- int i = this.repetitions;
-
- do {
- while (null == (result = consumer.poll())) {
- Thread.yield();
- }
- } while (0 != --i);
-
- this.result = result;
- this.end = System.nanoTime();
- }
- }
-}