From 61756547bb08b242524943fd14587419bd8ab9e7 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 20 Jan 2016 13:48:47 +0100 Subject: [PATCH] Code cleanup --- src/dorkbox/util/messagebus/MessageBus.java | 12 +- .../messagebus/common/MessageHandler.java | 11 +- .../common/adapter/Java6Adapter.java | 29 ---- .../common/adapter/Java8Adapter.java | 28 --- .../common/adapter/JavaVersionAdapter.java | 42 ----- .../messagebus/common/adapter/MapAdapter.java | 23 --- .../{common => error}/DeadMessage.java | 2 +- .../publication/PublisherExact.java | 2 +- .../PublisherExactWithSuperTypes.java | 6 +- ...PublisherExactWithSuperTypesAndVarity.java | 2 +- .../subscription/SubscriptionManager.java | 59 +++---- ...ruptor.java => SubscriptionWriterABQ.java} | 4 +- .../SubscriptionWriterDistruptor.java | 159 ++++++++++++++++++ .../util/messagebus/utils/VarArgUtils.java | 4 +- .../util/messagebus/DeadMessageTest.java | 1 + 15 files changed, 206 insertions(+), 178 deletions(-) delete mode 100644 src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java delete mode 100644 src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java delete mode 100644 src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java delete mode 100644 src/dorkbox/util/messagebus/common/adapter/MapAdapter.java rename src/dorkbox/util/messagebus/{common => error}/DeadMessage.java (98%) rename src/dorkbox/util/messagebus/subscription/{WriterDistruptor.java => SubscriptionWriterABQ.java} (97%) create mode 100644 src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index f92fc61..325af88 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -22,7 +22,7 @@ import dorkbox.util.messagebus.publication.PublisherExact; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity; import dorkbox.util.messagebus.subscription.SubscriptionManager; -import dorkbox.util.messagebus.subscription.WriterDistruptor; +import dorkbox.util.messagebus.subscription.SubscriptionWriterDistruptor; import dorkbox.util.messagebus.synchrony.AsyncDisruptor; import dorkbox.util.messagebus.synchrony.Sync; import dorkbox.util.messagebus.synchrony.Synchrony; @@ -31,7 +31,7 @@ import dorkbox.util.messagebus.synchrony.Synchrony; * The base class for all message bus implementations with support for asynchronous message dispatch. * * See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html - * tldr; we use single-writer-principle + Atomic.lazySet + * tldr; we use single-writer-principle + lazySet/get * * @author dorkbox, llc * Date: 2/2/15 @@ -40,7 +40,7 @@ public class MessageBus implements IMessageBus { private final ErrorHandlingSupport errorHandler; - private final WriterDistruptor subscriptionWriter; + private final SubscriptionWriterDistruptor subscriptionWriter; private final SubscriptionManager subscriptionManager; @@ -93,7 +93,7 @@ class MessageBus implements IMessageBus { */ this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler); - subscriptionWriter = new WriterDistruptor(errorHandler, subscriptionManager); + subscriptionWriter = new SubscriptionWriterDistruptor(errorHandler, subscriptionManager); switch (publishMode) { @@ -133,7 +133,7 @@ class MessageBus implements IMessageBus { return; } -// subscriptionManager.subscribe(listener); + // single writer principle subscriptionWriter.subscribe(listener); } @@ -144,7 +144,7 @@ class MessageBus implements IMessageBus { return; } -// subscriptionManager.unsubscribe(listener); + // single writer principle subscriptionWriter.unsubscribe(listener); } diff --git a/src/dorkbox/util/messagebus/common/MessageHandler.java b/src/dorkbox/util/messagebus/common/MessageHandler.java index 964f184..d8fdfa8 100644 --- a/src/dorkbox/util/messagebus/common/MessageHandler.java +++ b/src/dorkbox/util/messagebus/common/MessageHandler.java @@ -58,7 +58,6 @@ import java.util.Arrays; *

* BECAUSE OF THIS, we always treat the two the same *

- *

* * @author bennidi * Date: 11/14/12 @@ -67,14 +66,12 @@ import java.util.Arrays; */ public class MessageHandler { - // publish all listeners defined by the given class (includes - // listeners defined in super classes) + // publish all listeners defined by the given class (includes listeners defined in super classes) public static -//cache this? - MessageHandler[] get(final Class target) { + MessageHandler[] get(final Class messageClass) { // publish all handlers (this will include all (inherited) methods directly annotated using @Handler) - final Method[] allMethods = ReflectionUtils.getMethods(target); + final Method[] allMethods = ReflectionUtils.getMethods(messageClass); final int length = allMethods.length; final ArrayList finalMethods = new ArrayList(length); @@ -94,7 +91,7 @@ class MessageHandler { continue; } - Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target); + Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, messageClass); if (overriddenHandler == null) { overriddenHandler = method; } diff --git a/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java b/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java deleted file mode 100644 index 46f28da..0000000 --- a/src/dorkbox/util/messagebus/common/adapter/Java6Adapter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.adapter; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public -class Java6Adapter implements MapAdapter { - - @Override - public final - ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) { - return new ConcurrentHashMap(size, loadFactor, stripeSize); - } -} diff --git a/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java b/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java deleted file mode 100644 index 58977d3..0000000 --- a/src/dorkbox/util/messagebus/common/adapter/Java8Adapter.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.adapter; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public -class Java8Adapter implements MapAdapter { - @Override - public final - ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) { - return new ConcurrentHashMap(size, loadFactor, stripeSize); - } -} diff --git a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java b/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java deleted file mode 100644 index 3ccb999..0000000 --- a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.adapter; - -import java.util.concurrent.ConcurrentMap; - -public -class JavaVersionAdapter { - - - private static final MapAdapter get; - - static { - MapAdapter adapter; - try { - Class.forName("java.util.concurrent.locks.StampedLock"); - adapter = new Java8Adapter(); - } catch (Exception e) { - adapter = new Java6Adapter(); - } - - get = adapter; - } - - public static - ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize) { - return get.concurrentMap(size, loadFactor, stripeSize); - } -} diff --git a/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java b/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java deleted file mode 100644 index 96e59e6..0000000 --- a/src/dorkbox/util/messagebus/common/adapter/MapAdapter.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.adapter; - -import java.util.concurrent.ConcurrentMap; - -public -interface MapAdapter { - ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize); -} diff --git a/src/dorkbox/util/messagebus/common/DeadMessage.java b/src/dorkbox/util/messagebus/error/DeadMessage.java similarity index 98% rename from src/dorkbox/util/messagebus/common/DeadMessage.java rename to src/dorkbox/util/messagebus/error/DeadMessage.java index 39e9e78..be2e2db 100644 --- a/src/dorkbox/util/messagebus/common/DeadMessage.java +++ b/src/dorkbox/util/messagebus/error/DeadMessage.java @@ -35,7 +35,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.util.messagebus.common; +package dorkbox.util.messagebus.error; import java.util.Arrays; diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java index c49662e..d1ee694 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java @@ -15,7 +15,7 @@ */ package dorkbox.util.messagebus.publication; -import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.error.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java index e848dc9..edc13c2 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java @@ -15,7 +15,7 @@ */ package dorkbox.util.messagebus.publication; -import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.error.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -45,7 +45,7 @@ class PublisherExactWithSuperTypes implements Publisher { // Run subscriptions - final Subscription[] subscriptions = subManager.getExactAsArray(message1Class); // can return null + final Subscription[] subscriptions = subManager.getExact(message1Class); // can return null if (subscriptions != null) { hasSubs = true; synchrony.publish(subscriptions, message1); @@ -62,7 +62,7 @@ class PublisherExactWithSuperTypes implements Publisher { // Run dead message subscriptions if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getExactAsArray(DeadMessage.class); // can return null + final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null if (deadSubscriptions != null) { synchrony.publish(deadSubscriptions, new DeadMessage(message1)); } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java index 52cc5b7..9ded1c7 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java @@ -15,7 +15,7 @@ */ package dorkbox.util.messagebus.publication; -import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.error.DeadMessage; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 12f1676..876e5e6 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -63,7 +63,7 @@ class SubscriptionManager { private volatile IdentityMap, Subscription[]> subsPerMessageSingle; // keeps track of all subscriptions of the super classes of a message type. - private volatile IdentityMap, Subscription[]> subsPerMessageSuperSingle; + private volatile IdentityMap, Subscription[]> subsPerSuperMessageSingle; @@ -88,10 +88,10 @@ class SubscriptionManager { IdentityMap.class, "subsPerMessageSingle"); - private final AtomicReferenceFieldUpdater subsPerMessageSuperSingleRef = + private final AtomicReferenceFieldUpdater subsPerSuperMessageSingleREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, IdentityMap.class, - "subsPerMessageSuperSingle"); + "subsPerSuperMessageSingle"); //NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to // use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the @@ -107,7 +107,7 @@ class SubscriptionManager { nonListeners = new IdentityMap, Boolean>(16, LOAD_FACTOR); subsPerListener = new IdentityMap<>(32, LOAD_FACTOR); subsPerMessageSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR); - subsPerMessageSuperSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR); + subsPerSuperMessageSingle = new IdentityMap, Subscription[]>(32, LOAD_FACTOR); @@ -128,7 +128,7 @@ class SubscriptionManager { this.nonListeners.clear(); this.subsPerMessageSingle.clear(); - this.subsPerMessageSuperSingle.clear(); + this.subsPerSuperMessageSingle.clear(); this.subscriptionsPerMessageMulti.clear(); this.subsPerListener.clear(); @@ -195,8 +195,8 @@ class SubscriptionManager { // create the subscriptions subscriptions = new Subscription[handlersSize]; - final IdentityMap, Subscription[]> subsPerMessageSingle = subsPerMessageSingleREF.get(this); -// final IdentityMap, Subscription[]> subsPerMessageSingle = this.subsPerMessageSingle; + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap, Subscription[]> local = subsPerMessageSingleREF.get(this); Subscription subscription; @@ -218,11 +218,10 @@ class SubscriptionManager { messageHandlerTypes = messageHandler.getHandledMessages(); handlerType = messageHandlerTypes[0]; - if (!subsPerMessageSingle.containsKey(handlerType)) { - subsPerMessageSingle.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary + if (!local.containsKey(handlerType)) { + local.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary } - // create the subscription. This can be thrown away if the subscription succeeds in another thread subscription = new Subscription(messageHandler); subscriptions[i] = subscription; @@ -247,22 +246,21 @@ class SubscriptionManager { // makes this subscription visible for publication - final Subscription[] currentSubs = subsPerMessageSingle.get(handlerType); + final Subscription[] currentSubs = local.get(handlerType); final int currentLength = currentSubs.length; // add the new subscription to the beginning of the array final Subscription[] newSubs = new Subscription[currentLength + 1]; newSubs[0] = subscription; System.arraycopy(currentSubs, 0, newSubs, 1, currentLength); - subsPerMessageSingle.put(handlerType, newSubs); + local.put(handlerType, newSubs); - // update the super types/varity types - registerSuperSubs(handlerType); + // update the super types + registerSuperSubs(handlerType, local); } - subsPerMessageSingleREF.lazySet(this, subsPerMessageSingle); -// SUBS_SINGLE.set(this, subsPerMessageSingle); -// this.subsPerMessageSingle = subsPerMessageSingle; + // save this snapshot back to the original (single writer principle) + subsPerMessageSingleREF.lazySet(this, local); } else { // subscriptions already exist and must only be updated @@ -298,11 +296,11 @@ class SubscriptionManager { } private - void registerSuperSubs(final Class clazz) { + void registerSuperSubs(final Class clazz, final IdentityMap, Subscription[]> subsPerMessageSingle) { final Class[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response - final IdentityMap, Subscription[]> local = subsPerMessageSuperSingleRef.get(this); -// final IdentityMap, Subscription[]> local = this.subsPerMessageSuperSingle; + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap, Subscription[]> local = subsPerSuperMessageSingleREF.get(this); // types was not empty, so collect subscriptions for each type and collate them // save the subscriptions @@ -318,7 +316,7 @@ class SubscriptionManager { // walks through all of the subscriptions that might exist for super types, and if applicable, save them for (int i = 0; i < length; i++) { superClass = superClasses[i]; - superSubs = getExactAsArray(superClass); + superSubs = subsPerMessageSingle.get(superClass); if (superSubs != null) { superSubLength = superSubs.length; @@ -338,9 +336,8 @@ class SubscriptionManager { subsAsList.toArray(subs); local.put(clazz, subs); - - subsPerMessageSuperSingleRef.lazySet(this, local); -// subsPerMessageSuperSingle = local; + // save this snapshot back to the original (single writer principle) + subsPerSuperMessageSingleREF.lazySet(this, local); } } @@ -448,17 +445,18 @@ class SubscriptionManager { } } - + // can return null public - Subscription[] getExactAsArray(final Class messageClass) { + Subscription[] getExact(final Class messageClass) { return (Subscription[]) subsPerMessageSingleREF.get(this).get(messageClass); // return subsPerMessageSingle.get(messageClass); } + // can return null public Subscription[] getSuperExactAsArray(final Class messageClass) { // whenever our subscriptions change, this map is cleared. - return (Subscription[]) subsPerMessageSuperSingleRef.get(this).get(messageClass); + return (Subscription[]) subsPerSuperMessageSingleREF.get(this).get(messageClass); // return this.subsPerMessageSuperSingle.get(messageClass); } @@ -473,11 +471,6 @@ class SubscriptionManager { return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); } - // can return null - public - Subscription[] getExact(final Class messageClass) { - return getExactAsArray(messageClass); - } // can return null public @@ -513,7 +506,7 @@ class SubscriptionManager { // can return null public Subscription[] getExactAndSuper(final Class messageClass) { - Subscription[] collection = getExactAsArray(messageClass); // can return null + Subscription[] collection = getExact(messageClass); // can return null // now publish superClasses final Subscription[] superSubscriptions = getSuperExactAsArray(messageClass); // can return null diff --git a/src/dorkbox/util/messagebus/subscription/WriterDistruptor.java b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java similarity index 97% rename from src/dorkbox/util/messagebus/subscription/WriterDistruptor.java rename to src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java index 1e87bc2..a17bfd8 100644 --- a/src/dorkbox/util/messagebus/subscription/WriterDistruptor.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterABQ.java @@ -24,7 +24,7 @@ import java.util.concurrent.locks.LockSupport; * subscriptions. Even with concurrent hashMaps, there is still locks happening during contention. */ public -class WriterDistruptor { +class SubscriptionWriterABQ { private WorkProcessor workProcessor; private SubscriptionHandler handler; @@ -32,7 +32,7 @@ class WriterDistruptor { private Sequence workSequence; public - WriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) { + SubscriptionWriterABQ(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) { // Now we setup the disruptor and work handlers ExecutorService executor = new ThreadPoolExecutor(1, 1, diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java new file mode 100644 index 0000000..727b0c6 --- /dev/null +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionWriterDistruptor.java @@ -0,0 +1,159 @@ +package dorkbox.util.messagebus.subscription; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.LiteBlockingWaitStrategy; +import com.lmax.disruptor.PhasedBackoffWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.Sequencer; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.WorkProcessor; +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + + +/** + * Objective of this class is to conform to the "single writer principle", in order to maintain CLEAN AND SIMPLE concurrency for the + * subscriptions. Even with concurrent hashMaps, there is still locks happening during contention. + */ +public +class SubscriptionWriterDistruptor { + + private WorkProcessor workProcessor; + private SubscriptionHandler handler; + private RingBuffer ringBuffer; + private Sequence workSequence; + + public + SubscriptionWriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) { + // Now we setup the disruptor and work handlers + + ExecutorService executor = new ThreadPoolExecutor(1, 1, + 0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter + new java.util.concurrent.LinkedTransferQueue(), + new NamedThreadFactory("MessageBus-Subscriber")); + + final PublicationExceptionHandler exceptionHandler = new PublicationExceptionHandler(errorHandler); + EventFactory factory = new SubscriptionFactory(); + + // setup the work handlers + handler = new SubscriptionHandler(subscriptionManager); + + +// final int BUFFER_SIZE = ringBufferSize * 64; + final int BUFFER_SIZE = 1024 * 64; +// final int BUFFER_SIZE = 1024; +// final int BUFFER_SIZE = 32; +// final int BUFFER_SIZE = 16; +// final int BUFFER_SIZE = 8; +// final int BUFFER_SIZE = 4; + + + WaitStrategy consumerWaitStrategy; +// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one +// consumerWaitStrategy = new BlockingWaitStrategy(); +// consumerWaitStrategy = new YieldingWaitStrategy(); +// consumerWaitStrategy = new BusySpinWaitStrategy(); +// consumerWaitStrategy = new SleepingWaitStrategy(); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0)); +// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy()); + consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); + + + ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy); + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + + // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item + workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + workProcessor = new WorkProcessor(ringBuffer, sequenceBarrier, handler, exceptionHandler, workSequence); + + + // setup the WorkProcessor sequences (control what is consumed from the ring buffer) + final Sequence[] sequences = getSequences(); + ringBuffer.addGatingSequences(sequences); + + + // configure the start position for the WorkProcessors, and start them + final long cursor = ringBuffer.getCursor(); + workSequence.set(cursor); + + workProcessor.getSequence() + .set(cursor); + + executor.execute(workProcessor); + } + + /** + * @param listener is never null + */ + public + void subscribe(final Object listener) { + long seq = ringBuffer.next(); + + SubscriptionHolder job = ringBuffer.get(seq); + job.doSubscribe = true; + job.listener = listener; + + ringBuffer.publish(seq); + } + + /** + * @param listener is never null + */ + public + void unsubscribe(final Object listener) { + long seq = ringBuffer.next(); + + SubscriptionHolder job = ringBuffer.get(seq); + job.doSubscribe = false; + job.listener = listener; + + ringBuffer.publish(seq); + } + + + // gets the sequences used for processing work + private + Sequence[] getSequences() { + final Sequence[] sequences = new Sequence[2]; + sequences[0] = workProcessor.getSequence(); + sequences[1] = workSequence; // always add the work sequence + return sequences; + } + + + public + void start() { + } + + public + void shutdown() { + workProcessor.halt(); + + while (!handler.isShutdown()) { + LockSupport.parkNanos(100L); // wait 100ms for handlers to quit + } + } + + public + boolean hasPendingMessages() { + // from workerPool.drainAndHalt() + Sequence[] workerSequences = getSequences(); + final long cursor = ringBuffer.getCursor(); + for (Sequence s : workerSequences) { + if (cursor > s.get()) { + return true; + } + } + + return false; + } +} diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java index d9bf282..83ca194 100644 --- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java +++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java @@ -72,7 +72,7 @@ class VarArgUtils { // this gets (and caches) our array type. This is never cleared. final Class arrayVersion = this.superClassUtils.getArrayClass(messageClass); - final Subscription[] subs = subManager.getExactAsArray(arrayVersion); + final Subscription[] subs = subManager.getExact(arrayVersion); if (subs != null) { final int length = subs.length; final ArrayList varArgSubsAsList = new ArrayList(length); @@ -134,7 +134,7 @@ class VarArgUtils { for (int i = 0; i < typesLength; i++) { type = types[i]; - subs = subManager.getExactAsArray(type); + subs = subManager.getExact(type); if (subs != null) { length = subs.length; diff --git a/test/dorkbox/util/messagebus/DeadMessageTest.java b/test/dorkbox/util/messagebus/DeadMessageTest.java index 816c6bc..da15329 100644 --- a/test/dorkbox/util/messagebus/DeadMessageTest.java +++ b/test/dorkbox/util/messagebus/DeadMessageTest.java @@ -24,6 +24,7 @@ package dorkbox.util.messagebus; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.*; +import dorkbox.util.messagebus.error.DeadMessage; import dorkbox.util.messagebus.listeners.IMessageListener; import dorkbox.util.messagebus.listeners.MessagesListener; import dorkbox.util.messagebus.listeners.ObjectListener;