diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java
index f92fc61..325af88 100644
--- a/src/dorkbox/util/messagebus/MessageBus.java
+++ b/src/dorkbox/util/messagebus/MessageBus.java
@@ -22,7 +22,7 @@ import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
-import dorkbox.util.messagebus.subscription.WriterDistruptor;
+import dorkbox.util.messagebus.subscription.SubscriptionWriterDistruptor;
import dorkbox.util.messagebus.synchrony.AsyncDisruptor;
import dorkbox.util.messagebus.synchrony.Sync;
import dorkbox.util.messagebus.synchrony.Synchrony;
@@ -31,7 +31,7 @@ import dorkbox.util.messagebus.synchrony.Synchrony;
* The base class for all message bus implementations with support for asynchronous message dispatch.
*
* See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
- * tldr; we use single-writer-principle + Atomic.lazySet
+ * tldr; we use single-writer-principle + lazySet/get
*
* @author dorkbox, llc
* Date: 2/2/15
@@ -40,7 +40,7 @@ public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
- private final WriterDistruptor subscriptionWriter;
+ private final SubscriptionWriterDistruptor subscriptionWriter;
private final SubscriptionManager subscriptionManager;
@@ -93,7 +93,7 @@ class MessageBus implements IMessageBus {
*/
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler);
- subscriptionWriter = new WriterDistruptor(errorHandler, subscriptionManager);
+ subscriptionWriter = new SubscriptionWriterDistruptor(errorHandler, subscriptionManager);
switch (publishMode) {
@@ -133,7 +133,7 @@ class MessageBus implements IMessageBus {
return;
}
-// subscriptionManager.subscribe(listener);
+ // single writer principle
subscriptionWriter.subscribe(listener);
}
@@ -144,7 +144,7 @@ class MessageBus implements IMessageBus {
return;
}
-// subscriptionManager.unsubscribe(listener);
+ // single writer principle
subscriptionWriter.unsubscribe(listener);
}
diff --git a/src/dorkbox/util/messagebus/common/MessageHandler.java b/src/dorkbox/util/messagebus/common/MessageHandler.java
index 964f184..d8fdfa8 100644
--- a/src/dorkbox/util/messagebus/common/MessageHandler.java
+++ b/src/dorkbox/util/messagebus/common/MessageHandler.java
@@ -58,7 +58,6 @@ import java.util.Arrays;
*
* BECAUSE OF THIS, we always treat the two the same
*
- *
*
* @author bennidi
* Date: 11/14/12
@@ -67,14 +66,12 @@ import java.util.Arrays;
*/
public
class MessageHandler {
- // publish all listeners defined by the given class (includes
- // listeners defined in super classes)
+ // publish all listeners defined by the given class (includes listeners defined in super classes)
public static
-//cache this?
- MessageHandler[] get(final Class> target) {
+ MessageHandler[] get(final Class> messageClass) {
// publish all handlers (this will include all (inherited) methods directly annotated using @Handler)
- final Method[] allMethods = ReflectionUtils.getMethods(target);
+ final Method[] allMethods = ReflectionUtils.getMethods(messageClass);
final int length = allMethods.length;
final ArrayList finalMethods = new ArrayList(length);
@@ -94,7 +91,7 @@ class MessageHandler {
continue;
}
- Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
+ Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, messageClass);
if (overriddenHandler == null) {
overriddenHandler = method;
}
diff --git a/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java b/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java
deleted file mode 100644
index 46f28da..0000000
--- a/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2015 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.util.messagebus.common.adapter;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public
-class Java6Adapter implements MapAdapter {
-
- @Override
- public final
- ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) {
- return new ConcurrentHashMap(size, loadFactor, stripeSize);
- }
-}
diff --git a/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java b/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java
deleted file mode 100644
index 58977d3..0000000
--- a/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2015 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.util.messagebus.common.adapter;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public
-class Java8Adapter implements MapAdapter {
- @Override
- public final
- ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) {
- return new ConcurrentHashMap(size, loadFactor, stripeSize);
- }
-}
diff --git a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java b/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java
deleted file mode 100644
index 3ccb999..0000000
--- a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2015 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.util.messagebus.common.adapter;
-
-import java.util.concurrent.ConcurrentMap;
-
-public
-class JavaVersionAdapter {
-
-
- private static final MapAdapter get;
-
- static {
- MapAdapter adapter;
- try {
- Class.forName("java.util.concurrent.locks.StampedLock");
- adapter = new Java8Adapter();
- } catch (Exception e) {
- adapter = new Java6Adapter();
- }
-
- get = adapter;
- }
-
- public static
- ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) {
- return get.concurrentMap(size, loadFactor, stripeSize);
- }
-}
diff --git a/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java b/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java
deleted file mode 100644
index 96e59e6..0000000
--- a/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.util.messagebus.common.adapter;
-
-import java.util.concurrent.ConcurrentMap;
-
-public
-interface MapAdapter {
- ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize);
-}
diff --git a/src/dorkbox/util/messagebus/common/DeadMessage.java b/src/dorkbox/util/messagebus/error/DeadMessage.java
similarity index 98%
rename from src/dorkbox/util/messagebus/common/DeadMessage.java
rename to src/dorkbox/util/messagebus/error/DeadMessage.java
index 39e9e78..be2e2db 100644
--- a/src/dorkbox/util/messagebus/common/DeadMessage.java
+++ b/src/dorkbox/util/messagebus/error/DeadMessage.java
@@ -35,7 +35,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package dorkbox.util.messagebus.common;
+package dorkbox.util.messagebus.error;
import java.util.Arrays;
diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java
index c49662e..d1ee694 100644
--- a/src/dorkbox/util/messagebus/publication/PublisherExact.java
+++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java
@@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
-import dorkbox.util.messagebus.common.DeadMessage;
+import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java
index e848dc9..edc13c2 100644
--- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java
+++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java
@@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
-import dorkbox.util.messagebus.common.DeadMessage;
+import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
@@ -45,7 +45,7 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run subscriptions
- final Subscription[] subscriptions = subManager.getExactAsArray(message1Class); // can return null
+ final Subscription[] subscriptions = subManager.getExact(message1Class); // can return null
if (subscriptions != null) {
hasSubs = true;
synchrony.publish(subscriptions, message1);
@@ -62,7 +62,7 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run dead message subscriptions
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
- final Subscription[] deadSubscriptions = subManager.getExactAsArray(DeadMessage.class); // can return null
+ final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}
diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java
index 52cc5b7..9ded1c7 100644
--- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java
+++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java
@@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
-import dorkbox.util.messagebus.common.DeadMessage;
+import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
index 12f1676..876e5e6 100644
--- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
+++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
@@ -63,7 +63,7 @@ class SubscriptionManager {
private volatile IdentityMap, Subscription[]> subsPerMessageSingle;
// keeps track of all subscriptions of the super classes of a message type.
- private volatile IdentityMap, Subscription[]> subsPerMessageSuperSingle;
+ private volatile IdentityMap, Subscription[]> subsPerSuperMessageSingle;
@@ -88,10 +88,10 @@ class SubscriptionManager {
IdentityMap.class,
"subsPerMessageSingle");
- private final AtomicReferenceFieldUpdater subsPerMessageSuperSingleRef =
+ private final AtomicReferenceFieldUpdater subsPerSuperMessageSingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
- "subsPerMessageSuperSingle");
+ "subsPerSuperMessageSingle");
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
@@ -107,7 +107,7 @@ class SubscriptionManager {
nonListeners = new IdentityMap, Boolean>(16, LOAD_FACTOR);
subsPerListener = new IdentityMap<>(32, LOAD_FACTOR);
subsPerMessageSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR);
- subsPerMessageSuperSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR);
+ subsPerSuperMessageSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR);
@@ -128,7 +128,7 @@ class SubscriptionManager {
this.nonListeners.clear();
this.subsPerMessageSingle.clear();
- this.subsPerMessageSuperSingle.clear();
+ this.subsPerSuperMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subsPerListener.clear();
@@ -195,8 +195,8 @@ class SubscriptionManager {
// create the subscriptions
subscriptions = new Subscription[handlersSize];
- final IdentityMap, Subscription[]> subsPerMessageSingle = subsPerMessageSingleREF.get(this);
-// final IdentityMap, Subscription[]> subsPerMessageSingle = this.subsPerMessageSingle;
+ // access a snapshot of the subscriptions (single-writer-principle)
+ final IdentityMap, Subscription[]> local = subsPerMessageSingleREF.get(this);
Subscription subscription;
@@ -218,11 +218,10 @@ class SubscriptionManager {
messageHandlerTypes = messageHandler.getHandledMessages();
handlerType = messageHandlerTypes[0];
- if (!subsPerMessageSingle.containsKey(handlerType)) {
- subsPerMessageSingle.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary
+ if (!local.containsKey(handlerType)) {
+ local.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary
}
-
// create the subscription. This can be thrown away if the subscription succeeds in another thread
subscription = new Subscription(messageHandler);
subscriptions[i] = subscription;
@@ -247,22 +246,21 @@ class SubscriptionManager {
// makes this subscription visible for publication
- final Subscription[] currentSubs = subsPerMessageSingle.get(handlerType);
+ final Subscription[] currentSubs = local.get(handlerType);
final int currentLength = currentSubs.length;
// add the new subscription to the beginning of the array
final Subscription[] newSubs = new Subscription[currentLength + 1];
newSubs[0] = subscription;
System.arraycopy(currentSubs, 0, newSubs, 1, currentLength);
- subsPerMessageSingle.put(handlerType, newSubs);
+ local.put(handlerType, newSubs);
- // update the super types/varity types
- registerSuperSubs(handlerType);
+ // update the super types
+ registerSuperSubs(handlerType, local);
}
- subsPerMessageSingleREF.lazySet(this, subsPerMessageSingle);
-// SUBS_SINGLE.set(this, subsPerMessageSingle);
-// this.subsPerMessageSingle = subsPerMessageSingle;
+ // save this snapshot back to the original (single writer principle)
+ subsPerMessageSingleREF.lazySet(this, local);
}
else {
// subscriptions already exist and must only be updated
@@ -298,11 +296,11 @@ class SubscriptionManager {
}
private
- void registerSuperSubs(final Class> clazz) {
+ void registerSuperSubs(final Class> clazz, final IdentityMap, Subscription[]> subsPerMessageSingle) {
final Class>[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response
- final IdentityMap, Subscription[]> local = subsPerMessageSuperSingleRef.get(this);
-// final IdentityMap, Subscription[]> local = this.subsPerMessageSuperSingle;
+ // access a snapshot of the subscriptions (single-writer-principle)
+ final IdentityMap, Subscription[]> local = subsPerSuperMessageSingleREF.get(this);
// types was not empty, so collect subscriptions for each type and collate them
// save the subscriptions
@@ -318,7 +316,7 @@ class SubscriptionManager {
// walks through all of the subscriptions that might exist for super types, and if applicable, save them
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
- superSubs = getExactAsArray(superClass);
+ superSubs = subsPerMessageSingle.get(superClass);
if (superSubs != null) {
superSubLength = superSubs.length;
@@ -338,9 +336,8 @@ class SubscriptionManager {
subsAsList.toArray(subs);
local.put(clazz, subs);
-
- subsPerMessageSuperSingleRef.lazySet(this, local);
-// subsPerMessageSuperSingle = local;
+ // save this snapshot back to the original (single writer principle)
+ subsPerSuperMessageSingleREF.lazySet(this, local);
}
}
@@ -448,17 +445,18 @@ class SubscriptionManager {
}
}
-
+ // can return null
public
- Subscription[] getExactAsArray(final Class> messageClass) {
+ Subscription[] getExact(final Class> messageClass) {
return (Subscription[]) subsPerMessageSingleREF.get(this).get(messageClass);
// return subsPerMessageSingle.get(messageClass);
}
+ // can return null
public
Subscription[] getSuperExactAsArray(final Class> messageClass) {
// whenever our subscriptions change, this map is cleared.
- return (Subscription[]) subsPerMessageSuperSingleRef.get(this).get(messageClass);
+ return (Subscription[]) subsPerSuperMessageSingleREF.get(this).get(messageClass);
// return this.subsPerMessageSuperSingle.get(messageClass);
}
@@ -473,11 +471,6 @@ class SubscriptionManager {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
- // can return null
- public
- Subscription[] getExact(final Class> messageClass) {
- return getExactAsArray(messageClass);
- }
// can return null
public
@@ -513,7 +506,7 @@ class SubscriptionManager {
// can return null
public
Subscription[] getExactAndSuper(final Class> messageClass) {
- Subscription[] collection = getExactAsArray(messageClass); // can return null
+ Subscription[] collection = getExact(messageClass); // can return null
// now publish superClasses
final Subscription[] superSubscriptions = getSuperExactAsArray(messageClass); // can return null
diff --git a/src/dorkbox/util/messagebus/subscription/WriterDistruptor.java b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java
similarity index 97%
rename from src/dorkbox/util/messagebus/subscription/WriterDistruptor.java
rename to src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java
index 1e87bc2..a17bfd8 100644
--- a/src/dorkbox/util/messagebus/subscription/WriterDistruptor.java
+++ b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.LockSupport;
* subscriptions. Even with concurrent hashMaps, there is still locks happening during contention.
*/
public
-class WriterDistruptor {
+class SubscriptionWriterABQ {
private WorkProcessor workProcessor;
private SubscriptionHandler handler;
@@ -32,7 +32,7 @@ class WriterDistruptor {
private Sequence workSequence;
public
- WriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
+ SubscriptionWriterABQ(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(1, 1,
diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java
new file mode 100644
index 0000000..727b0c6
--- /dev/null
+++ b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java
@@ -0,0 +1,159 @@
+package dorkbox.util.messagebus.subscription;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.LiteBlockingWaitStrategy;
+import com.lmax.disruptor.PhasedBackoffWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.Sequence;
+import com.lmax.disruptor.SequenceBarrier;
+import com.lmax.disruptor.Sequencer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.WorkProcessor;
+import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
+import dorkbox.util.messagebus.error.ErrorHandlingSupport;
+import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+
+/**
+ * Objective of this class is to conform to the "single writer principle", in order to maintain CLEAN AND SIMPLE concurrency for the
+ * subscriptions. Even with concurrent hashMaps, there is still locks happening during contention.
+ */
+public
+class SubscriptionWriterDistruptor {
+
+ private WorkProcessor workProcessor;
+ private SubscriptionHandler handler;
+ private RingBuffer ringBuffer;
+ private Sequence workSequence;
+
+ public
+ SubscriptionWriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
+ // Now we setup the disruptor and work handlers
+
+ ExecutorService executor = new ThreadPoolExecutor(1, 1,
+ 0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
+ new java.util.concurrent.LinkedTransferQueue(),
+ new NamedThreadFactory("MessageBus-Subscriber"));
+
+ final PublicationExceptionHandler exceptionHandler = new PublicationExceptionHandler(errorHandler);
+ EventFactory factory = new SubscriptionFactory();
+
+ // setup the work handlers
+ handler = new SubscriptionHandler(subscriptionManager);
+
+
+// final int BUFFER_SIZE = ringBufferSize * 64;
+ final int BUFFER_SIZE = 1024 * 64;
+// final int BUFFER_SIZE = 1024;
+// final int BUFFER_SIZE = 32;
+// final int BUFFER_SIZE = 16;
+// final int BUFFER_SIZE = 8;
+// final int BUFFER_SIZE = 4;
+
+
+ WaitStrategy consumerWaitStrategy;
+// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one
+// consumerWaitStrategy = new BlockingWaitStrategy();
+// consumerWaitStrategy = new YieldingWaitStrategy();
+// consumerWaitStrategy = new BusySpinWaitStrategy();
+// consumerWaitStrategy = new SleepingWaitStrategy();
+// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
+// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
+ consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
+
+
+ ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
+ SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
+
+
+ // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
+ workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
+ workProcessor = new WorkProcessor(ringBuffer, sequenceBarrier, handler, exceptionHandler, workSequence);
+
+
+ // setup the WorkProcessor sequences (control what is consumed from the ring buffer)
+ final Sequence[] sequences = getSequences();
+ ringBuffer.addGatingSequences(sequences);
+
+
+ // configure the start position for the WorkProcessors, and start them
+ final long cursor = ringBuffer.getCursor();
+ workSequence.set(cursor);
+
+ workProcessor.getSequence()
+ .set(cursor);
+
+ executor.execute(workProcessor);
+ }
+
+ /**
+ * @param listener is never null
+ */
+ public
+ void subscribe(final Object listener) {
+ long seq = ringBuffer.next();
+
+ SubscriptionHolder job = ringBuffer.get(seq);
+ job.doSubscribe = true;
+ job.listener = listener;
+
+ ringBuffer.publish(seq);
+ }
+
+ /**
+ * @param listener is never null
+ */
+ public
+ void unsubscribe(final Object listener) {
+ long seq = ringBuffer.next();
+
+ SubscriptionHolder job = ringBuffer.get(seq);
+ job.doSubscribe = false;
+ job.listener = listener;
+
+ ringBuffer.publish(seq);
+ }
+
+
+ // gets the sequences used for processing work
+ private
+ Sequence[] getSequences() {
+ final Sequence[] sequences = new Sequence[2];
+ sequences[0] = workProcessor.getSequence();
+ sequences[1] = workSequence; // always add the work sequence
+ return sequences;
+ }
+
+
+ public
+ void start() {
+ }
+
+ public
+ void shutdown() {
+ workProcessor.halt();
+
+ while (!handler.isShutdown()) {
+ LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
+ }
+ }
+
+ public
+ boolean hasPendingMessages() {
+ // from workerPool.drainAndHalt()
+ Sequence[] workerSequences = getSequences();
+ final long cursor = ringBuffer.getCursor();
+ for (Sequence s : workerSequences) {
+ if (cursor > s.get()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
index d9bf282..83ca194 100644
--- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java
+++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
@@ -72,7 +72,7 @@ class VarArgUtils {
// this gets (and caches) our array type. This is never cleared.
final Class> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
- final Subscription[] subs = subManager.getExactAsArray(arrayVersion);
+ final Subscription[] subs = subManager.getExact(arrayVersion);
if (subs != null) {
final int length = subs.length;
final ArrayList varArgSubsAsList = new ArrayList(length);
@@ -134,7 +134,7 @@ class VarArgUtils {
for (int i = 0; i < typesLength; i++) {
type = types[i];
- subs = subManager.getExactAsArray(type);
+ subs = subManager.getExact(type);
if (subs != null) {
length = subs.length;
diff --git a/test/dorkbox/util/messagebus/DeadMessageTest.java b/test/dorkbox/util/messagebus/DeadMessageTest.java
index 816c6bc..da15329 100644
--- a/test/dorkbox/util/messagebus/DeadMessageTest.java
+++ b/test/dorkbox/util/messagebus/DeadMessageTest.java
@@ -24,6 +24,7 @@ package dorkbox.util.messagebus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.*;
+import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.ObjectListener;