Changed async message queue implementation to the Conversant Disruptor, and added the ability to use a specific BlockingQueue implementation for the async publications of messages. Added support for a zero-gc messagebus, in the event that you need one.

master
nathan 2019-06-08 01:40:27 +02:00
parent e19291bb0a
commit 887fdecadf
18 changed files with 1042 additions and 718 deletions

View File

@ -16,9 +16,6 @@
import java.time.Instant
import java.util.*
import kotlin.reflect.KMutableProperty
import kotlin.reflect.full.declaredMemberProperties
///////////////////////////////
////// PUBLISH TO SONATYPE / MAVEN CENTRAL
@ -36,14 +33,14 @@ plugins {
`maven-publish`
// close and release on sonatype
id("io.codearte.nexus-staging") version "0.20.0"
id("io.codearte.nexus-staging") version "0.21.0"
id("com.dorkbox.CrossCompile") version "1.0.1"
id("com.dorkbox.Licensing") version "1.4"
id("com.dorkbox.VersionUpdate") version "1.4.1"
id("com.dorkbox.GradleUtils") version "1.0"
id("com.dorkbox.VersionUpdate") version "1.5"
id("com.dorkbox.GradleUtils") version "1.2"
kotlin("jvm") version "1.3.21"
kotlin("jvm") version "1.3.31"
}
object Extras {
@ -68,34 +65,11 @@ object Extras {
///////////////////////////////
///// assign 'Extras'
///////////////////////////////
GradleUtils.load("$projectDir/../../gradle.properties", Extras)
description = Extras.description
group = Extras.group
version = Extras.version
val propsFile = File("$projectDir/../../gradle.properties").normalize()
if (propsFile.canRead()) {
println("\tLoading custom property data from: [$propsFile]")
val props = Properties()
propsFile.inputStream().use {
props.load(it)
}
val extraProperties = Extras::class.declaredMemberProperties.filterIsInstance<KMutableProperty<String>>()
props.forEach { (k, v) -> run {
val key = k as String
val value = v as String
val member = extraProperties.find { it.name == key }
if (member != null) {
member.setter.call(Extras::class.objectInstance, value)
}
else {
project.extra.set(k, v)
}
}}
}
licensing {
license(License.APACHE_2) {
@ -118,9 +92,9 @@ licensing {
license("Disruptor", License.APACHE_2) {
copyright(2011)
author("LMAX Ltd")
url("https://github.com/LMAX-Exchange/disruptor/")
note("High Performance Inter-Thread Messaging Library")
author("Conversant")
url("https://github.com/conversant/disruptor/")
note("The highest performing intra-thread transfer mechanism available in Java")
}
license("FastThreadLocal", License.BSD_3) {
@ -130,10 +104,11 @@ licensing {
url("https://github.com/LWJGL/lwjgl3/blob/5819c9123222f6ce51f208e022cb907091dd8023/modules/core/src/main/java/org/lwjgl/system/FastThreadLocal.java")
}
license("Kryo", License.BSD_3) {
copyright(2008)
author("Nathan Sweet")
url("https://github.com/EsotericSoftware/kryo")
license("IntMap", License.APACHE_2) {
copyright(2013)
author("Mario Zechner <badlogicgames@gmail.com>")
author("Nathan Sweet <nathan.sweet@gmail.com>")
url("http://github.com/libgdx/libgdx/")
}
license("ReflectASM", License.BSD_3) {
@ -153,6 +128,12 @@ licensing {
author("QOS.ch")
url("http://www.slf4j.org")
}
license("Vibur Object Pool", License.APACHE_2) {
copyright(2013)
author("Simeon Malchev")
url("https://github.com/vibur/vibur-object-pool")
}
}
sourceSets {
@ -164,6 +145,15 @@ sourceSets {
include("**/*.java")
}
}
test {
java {
setSrcDirs(listOf("test"))
// want to include java files for the source. 'setSrcDirs' resets includes...
include("**/*.java")
}
}
}
repositories {
@ -205,13 +195,19 @@ tasks.compileJava.get().apply {
dependencies {
api ("com.lmax:disruptor:3.4.2")
implementation("com.dorkbox:Utilities:1.2")
api ("org.ow2.asm:asm:5.2")
api ("com.esotericsoftware:reflectasm:1.11.1")
api ("com.esotericsoftware:kryo:4.0.2")
implementation("com.conversantmedia:disruptor:1.2.15")
implementation("org.vibur:vibur-object-pool:23.0")
api("org.slf4j:slf4j-api:1.7.25")
implementation("org.ow2.asm:asm:7.1")
implementation("com.esotericsoftware:reflectasm:1.11.9")
implementation("org.slf4j:slf4j-api:1.7.26")
testCompile("junit:junit:4.12")
testCompile("ch.qos.logback:logback-classic:1.2.3")
}
///////////////////////////////

View File

@ -1,3 +1,18 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus;
public

View File

@ -15,6 +15,11 @@
*/
package dorkbox.messageBus;
import java.util.concurrent.BlockingQueue;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.dispatch.DispatchCancel;
import dorkbox.messageBus.dispatch.DispatchExact;
@ -22,88 +27,81 @@ import dorkbox.messageBus.dispatch.DispatchExactWithSuperTypes;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.error.IPublicationErrorHandler;
import dorkbox.messageBus.subscription.SubscriptionManager;
import dorkbox.messageBus.synchrony.AsyncABQ;
import dorkbox.messageBus.synchrony.AsyncABQ_noGc;
import dorkbox.messageBus.synchrony.AsyncDisruptor;
import dorkbox.messageBus.synchrony.Async;
import dorkbox.messageBus.synchrony.MessageHolder;
import dorkbox.messageBus.synchrony.Sync;
import dorkbox.messageBus.synchrony.Synchrony;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch.
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.
* <p/>
*
* Because the message bus keeps track of classes that are subscribed and published, reloading the classloader means that you will need to
* SHUTDOWN the messagebus when you unload the classloader, and then re-subscribe relevant classes when you reload the classes.
* <p/>
*
* Messages can be published synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* Message handlers can be invoked synchronously or asynchronously depending on their configuration. Thus, there
* are two notions of synchronicity / asynchronicity. One on the caller side, e.g. the invocation of the message publishing
* methods. The second on the handler side, e.g. whether the handler is invoked in the same or a different thread.
*
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages publish published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* in a synchronized fashion using @Synchronized annotation
*
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Handler annotation.
*
* <p/>
* By default, the bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages publish dispatched. This can be changed using the @Listener annotation.
*
* <p/>
* Generally message handlers will be invoked in inverse sequence of subscription but any
* client using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the respective message handlers.
*
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message.
*
* <p/>
* You may cancel any further dispatch of a message via {@link #cancel()}
*
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any message listener may only be subscribed once -> subsequent subscriptions of an already subscribed message listener
* will be silently ignored)
*
* <p/>
* Removing a listener (unsubscribing) means removing all subscribed message handlers of that listener. This remove operation
* immediately takes effect and on all running dispatch processes -> A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
* after the remove operation completed.
*
* <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* publish dispatched to all message handlers that take an instance of List as their parameter
*
*
*
* See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
* TLDR: we use single-writer-principle + lazySet/get for major performance
*
* @author bennidi
* Date: 2/8/12
* @author dorkbox, llc
* Date: 2/2/15
*/
@SuppressWarnings("WeakerAccess")
public
class MessageBus implements IMessageBus {
/**
* By default, we use ASM for accessing methods during the dispatch of messages. This is only available on certain platforms, and so
* it will gracefully 'fallback' to using standard java reflection to access the methods. "Standard java reflection" is not as fast
* as ASM, but only marginally.
*
* If you would like to use java reflection for accessing methods, set this value to false.
*/
public static boolean useAsmForDispatch = true;
/**
* 'useDisruptorForAsyncPublish' specifies to use the LMAX Disruptor for asynchronous dispatch of published messages. The benefit of
* such is that it is VERY high performance and generates zero garbage on the heap. The alternative (if this value is false), is to
* use an ArrayBlockingQueue, which has a "non-garbage" version (which is zero garbage, but slow-ish) and it's opposite (which
* generates garbage on the heap, but is faster).
*
* The disruptor is faster and better than either of these two, however because of it's use of unsafe, it is not available in all
* circumstances.
*/
public static boolean useDisruptorForAsyncPublish = true;
/**
* When using the ArrayBlockingQueue for the asynchronous dispatch of published messages, there are two modes of operation. A
* "non-garbage" version (which is zero garbage, but slow-ish) and it's opposite (which generates garbage on the heap, but is faster).
*
* By default, we strive to prevent garbage on the heap, so we use the "non-garbage" version. If you don't care about generating
* garbage on the heap, set this value to false.
*/
public static boolean useZeroGarbageVersionOfABQ = true;
/**
* By default, we use strong references when saving the subscribed listeners (these are the classes & methods that receive messages),
* however in certain environments (ie: spring), it is desirable to use weak references -- so that there are no memory leaks during
* the container lifecycle (or, more specifically, so one doesn't have to manually manage the memory).
*
* Using weak references is a tad slower than using strong references, since there are additional steps taken when there are orphaned
* references (when GC occurs) that have to be cleaned up. This cleanup occurs during message publication
*/
public static boolean useStrongReferencesByDefault = true;
static {
// check to see if we can use ASM for method access (it's a LOT faster than reflection). By default, we use ASM.
if (useAsmForDispatch) {
// only bother checking if we are different that the defaults
try {
Class.forName("com.esotericsoftware.reflectasm.MethodAccess");
} catch (Exception e) {
useAsmForDispatch = false;
}
}
// check to see if we can use the disruptor for publication (otherwise, we use native java). The disruptor is a lot faster, but
// not available on all platforms/JRE's because of it's use of UNSAFE.
if (useDisruptorForAsyncPublish) {
// only bother checking if we are different that the defaults
try {
Class.forName("com.lmax.disruptor.RingBuffer");
} catch (Exception e) {
useDisruptorForAsyncPublish = false;
}
}
}
class MessageBus {
/**
* Gets the version number.
*/
@ -113,102 +111,109 @@ class MessageBus implements IMessageBus {
}
/**
* Cancels the publication of the message (or messages). Only applicable for the currently running thread. No more subscribers for
* this message will be called.
* Cancels the publication of the message or messages. Only applicable for the current dispatched message.
* <p>
* No more subscribers for this message will be called.
*/
public static
void cancel() {
throw new DispatchCancel();
}
private static
Dispatch getDispatch(final DispatchMode dispatchMode, final ErrorHandler errorHandler, final SubscriptionManager subscriptionManager) {
if (dispatchMode == DispatchMode.Exact) {
return new DispatchExact(errorHandler, subscriptionManager);
}
return new DispatchExactWithSuperTypes(errorHandler, subscriptionManager);
}
private final ErrorHandler errorHandler;
private final SubscriptionManager subscriptionManager;
private final Dispatch dispatch;
private final Synchrony syncPublication;
private final Synchrony asyncPublication;
/**
* By default, will permit subType matching, and will use half of CPUs available for dispatching async messages
* Will permit subType matching for matching what subscription handles which message
* <p>
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous message publication
* <p>
* Will use half of CPUs available for dispatching async messages
*/
public
MessageBus() {
this(Runtime.getRuntime().availableProcessors()/2);
}
/**
* By default, will permit subType matching
* Will permit subType matching for matching what subscription handles which message
* <p>
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous message publication
*
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(final int numberOfThreads) {
this(DispatchMode.ExactWithSuperTypes, numberOfThreads);
this(DispatchMode.ExactWithSuperTypes, SubscriptionMode.StrongReferences, numberOfThreads);
}
/**
* By default, will use half of CPUs available for dispatching async messages
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous publication
*
* @param dispatchMode Specifies which publishMode to operate the publication of messages.
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
*/
public
MessageBus(final DispatchMode dispatchMode) {
this(dispatchMode, Runtime.getRuntime().availableProcessors());
MessageBus(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode) {
this(dispatchMode, subscriptionMode, Runtime.getRuntime().availableProcessors()/2);
}
/**
* @param dispatchMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to use for dispatching async messages
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous publication
*
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(final DispatchMode dispatchMode, int numberOfThreads) {
// round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
MessageBus(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode, final int numberOfThreads) {
this(dispatchMode, subscriptionMode, new DisruptorBlockingQueue<MessageHolder>(1024, SpinPolicy.BLOCKING), numberOfThreads);
}
/**
* Will use the Conversant Disruptor for asynchronous dispatch of published messages.
* <p>
* The benefit of such is that it is VERY high performance and generates zero garbage on the heap.
*
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
* @param dispatchQueue Specified the Blocking queue implementation for managing asynchronous message publication
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode, final BlockingQueue<MessageHolder> dispatchQueue, final int numberOfThreads) {
this.errorHandler = new ErrorHandler();
// Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
this.subscriptionManager = new SubscriptionManager(useStrongReferencesByDefault);
this.subscriptionManager = new SubscriptionManager(subscriptionMode);
switch (dispatchMode) {
case Exact:
dispatch = new DispatchExact(errorHandler, subscriptionManager);
break;
Dispatch dispatch = getDispatch(dispatchMode, errorHandler, subscriptionManager);
case ExactWithSuperTypes:
default:
dispatch = new DispatchExactWithSuperTypes(errorHandler, subscriptionManager);
break;
}
syncPublication = new Sync();
// the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue
if (useDisruptorForAsyncPublish) {
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler);
} else {
if (useZeroGarbageVersionOfABQ) {
// no garbage is created, but this is slow (but faster than other messagebus implementations)
asyncPublication = new AsyncABQ_noGc(numberOfThreads, errorHandler);
}
else {
// garbage is created, but this is fast
asyncPublication = new AsyncABQ(numberOfThreads, errorHandler);
}
}
}
/**
* Always return at least 2 threads
*/
private static
int getMinNumberOfThreads(final int numberOfThreads) {
if (numberOfThreads < 2) {
return 2;
}
return numberOfThreads;
syncPublication = new Sync(dispatch);
asyncPublication = new Async(numberOfThreads, dispatch, dispatchQueue, errorHandler);
}
@ -216,7 +221,6 @@ class MessageBus implements IMessageBus {
* Subscribe all handlers of the given listener. Any listener is only subscribed once and
* subsequent subscriptions of an already subscribed listener will be silently ignored
*/
@Override
public
void subscribe(final Object listener) {
if (listener == null) {
@ -230,14 +234,13 @@ class MessageBus implements IMessageBus {
/**
* Immediately remove all registered message handlers (if any) of the given listener.
*
* <p>
* When this call returns all handlers have effectively been removed and will not
* receive any messages (provided that running publications/iterators in other threads
* have not yet obtained a reference to the listener)
* <p>
* A call to this method passing any object that is not subscribed will not have any effect and is silently ignored.
*/
@Override
public
void unsubscribe(final Object listener) {
if (listener == null) {
@ -250,88 +253,97 @@ class MessageBus implements IMessageBus {
/**
* Synchronously publish a message to all registered listeners. This includes listeners
* defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered
* listeners have been notified (invoked) of the message.
* Synchronously publish a message to all registered listeners.
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
@Override
public
void publish(final Object message1) {
syncPublication.publish(dispatch, message1);
void publish(final Object message) {
syncPublication.publish(message);
}
/**
* Synchronously publish <b>TWO</b> messages to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
* Synchronously publish <b>TWO</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
@Override
public
void publish(final Object message1, final Object message2) {
syncPublication.publish(dispatch, message1, message2);
syncPublication.publish(message1, message2);
}
/**
* Synchronously publish <b>THREE</b> messages to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
* Synchronously publish <b>THREE</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
syncPublication.publish(dispatch, message1, message2, message3);
syncPublication.publish(message1, message2, message3);
}
/**
* Publish the message asynchronously to all registered listeners (that match the signature). This includes
* listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes. This call returns immediately.
* <i>Asynchronously</i> publish the message to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes.
* <p>
* This call returns immediately.
*/
@Override
public
void publishAsync(final Object message) {
asyncPublication.publish(dispatch, message);
asyncPublication.publish(message);
}
/**
* Publish <b>TWO</b> messages asynchronously to all registered listeners (that match the signature). This
* <i>Asynchronously</i> publish <b>TWO</b> messages to all registered listeners (that match the signature).
* <p>
* This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. This call returns immediately.
* to reject valid subtypes.
* <p>
* This call returns immediately.
*/
@Override
public
void publishAsync(final Object message1, final Object message2) {
asyncPublication.publish(dispatch, message1, message2);
asyncPublication.publish(message1, message2);
}
/**
* Publish <b>THREE</b> messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes. This call returns immediately.
* <i>Asynchronously</i> publish <b>THREE</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes.
* <p>
* This call returns immediately.
*/
@Override
public
void publishAsync(final Object message1, final Object message2, final Object message3) {
asyncPublication.publish(dispatch, message1, message2, message3);
asyncPublication.publish(message1, message2, message3);
}
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
*
* <p>
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*/
@Override
public
void addErrorHandler(final IPublicationErrorHandler errorHandler) {
this.errorHandler.addErrorHandler(errorHandler);
@ -339,23 +351,24 @@ class MessageBus implements IMessageBus {
/**
* Check whether any asynchronous message publications are pending to be processed
* Check whether any asynchronous message publications are pending to be processed.
* <p>
* Because of the nature of MULTI-THREADED, ASYNCHRONOUS environments, it is ** MORE THAN LIKELY ** this will not be an accurate reflection of the current state.
*
* @return true if any unfinished message publications are found
* @return true if there are still message publications waiting to be processed.
*/
@Override
public final
boolean hasPendingMessages() {
return asyncPublication.hasPendingMessages();
}
/**
* Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and
* other internally used threads will be shutdown gracefully.
* <p>
* After calling shutdown it is not safe to further use the message bus.
*/
@Override
public
void shutdown() {
this.syncPublication.shutdown();

View File

@ -0,0 +1,389 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus;
import java.util.concurrent.BlockingQueue;
import org.vibur.objectpool.PoolService;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.dispatch.DispatchCancel;
import dorkbox.messageBus.dispatch.DispatchExact;
import dorkbox.messageBus.dispatch.DispatchExactWithSuperTypes;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.error.IPublicationErrorHandler;
import dorkbox.messageBus.subscription.SubscriptionManager;
import dorkbox.messageBus.synchrony.AsyncZeroGC;
import dorkbox.messageBus.synchrony.MessageHolderZeroGC;
import dorkbox.messageBus.synchrony.SyncZeroGC;
import dorkbox.messageBus.synchrony.SynchronyZeroGC;
/**
* A message bus with ZERO GC capabilities offers facilities for publishing messages to the message handlers of registered listeners.
* <p/>
* Zero GC is possible through the useage of object pools. In our specific case, we use the Vibur Object Pool, because
* it is one of the fastest AND it supports the usage of the Conversant Disruptor.
* <p/>
*
* Because the message bus keeps track of classes that are subscribed and published, reloading the classloader means that you will need to
* SHUTDOWN the messagebus when you unload the classloader, and then re-subscribe relevant classes when you reload the classes.
* <p/>
*
* Messages can be published synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* Message handlers can be invoked synchronously or asynchronously depending on their configuration. Thus, there
* are two notions of synchronicity / asynchronicity. One on the caller side, e.g. the invocation of the message publishing
* methods. The second on the handler side, e.g. whether the handler is invoked in the same or a different thread.
*
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages publish published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* in a synchronized fashion using @Synchronized annotation
*
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Handler annotation.
*
* <p/>
* By default, the bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages publish dispatched. This can be changed using the @Listener annotation.
*
* <p/>
* Generally message handlers will be invoked in inverse sequence of subscription but any
* client using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the respective message handlers.
*
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message.
*
* <p/>
* You may cancel any further dispatch of a message via {@link #cancel()}
*
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any message listener may only be subscribed once -> subsequent subscriptions of an already subscribed message listener
* will be silently ignored)
*
* <p/>
* Removing a listener (unsubscribing) means removing all subscribed message handlers of that listener. This remove operation
* immediately takes effect and on all running dispatch processes -> A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
* after the remove operation completed.
*
* <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* publish dispatched to all message handlers that take an instance of List as their parameter
*
*
*
* See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
* TLDR: we use single-writer-principle + lazySet/get for major performance
*
* @author bennidi
* Date: 2/8/12
* @author dorkbox, llc
* Date: 2/2/15
*/
@SuppressWarnings("WeakerAccess")
public
class MessageBusZeroGC {
/**
* Gets the version number.
*/
public static
String getVersion() {
return "1.20";
}
/**
* Cancels the publication of the message or messages. Only applicable for the current dispatched message.
* <p>
* No more subscribers for this message will be called.
*/
public static
void cancel() {
throw new DispatchCancel();
}
private static
Dispatch getDispatch(final DispatchMode dispatchMode, final ErrorHandler errorHandler, final SubscriptionManager subscriptionManager) {
if (dispatchMode == DispatchMode.Exact) {
return new DispatchExact(errorHandler, subscriptionManager);
}
return new DispatchExactWithSuperTypes(errorHandler, subscriptionManager);
}
private final ErrorHandler errorHandler;
private final SubscriptionManager subscriptionManager;
private final SynchronyZeroGC syncPublication;
private final SynchronyZeroGC asyncPublication;
/**
* Will permit subType matching for matching what subscription handles which message
* <p>
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous message publication
* <p>
* Will use half of CPUs available for dispatching async messages
*/
public
MessageBusZeroGC() {
this(Runtime.getRuntime().availableProcessors()/2);
}
/**
* Will permit subType matching for matching what subscription handles which message
* <p>
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous message publication
*
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBusZeroGC(final int numberOfThreads) {
this(DispatchMode.ExactWithSuperTypes, SubscriptionMode.StrongReferences, numberOfThreads);
}
/**
* Will use half of CPUs available for dispatching async messages
* <p>
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous publication
*
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
*/
public
MessageBusZeroGC(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode) {
this(dispatchMode, subscriptionMode, Runtime.getRuntime().availableProcessors()/2);
}
/**
* Will use the Conversant Disruptor as the blocking queue implementation for asynchronous publication
*
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBusZeroGC(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode, final int numberOfThreads) {
this(dispatchMode, subscriptionMode, new DisruptorBlockingQueue<MessageHolderZeroGC>(1024, SpinPolicy.BLOCKING), numberOfThreads);
}
/**
* Will use the Conversant Disruptor for asynchronous dispatch of published messages.
* <p>
* The benefit of such is that it is VERY high performance and generates zero garbage on the heap.
*
* @param dispatchMode Specifies which Dispatch Mode (Exact or ExactWithSuperTypes) to allow what subscription hierarchies receive the publication of a message.
* @param subscriptionMode Specifies which Subscription Mode Mode (Strong or Weak) to change how subscription handlers are saved internally.
* @param dispatchQueue Specified the Blocking queue implementation for managing asynchronous message publication
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBusZeroGC(final DispatchMode dispatchMode, final SubscriptionMode subscriptionMode, final BlockingQueue<MessageHolderZeroGC> dispatchQueue, final int numberOfThreads) {
this.errorHandler = new ErrorHandler();
// Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
this.subscriptionManager = new SubscriptionManager(subscriptionMode);
Dispatch dispatch = getDispatch(dispatchMode, errorHandler, subscriptionManager);
syncPublication = new SyncZeroGC(dispatch);
asyncPublication = new AsyncZeroGC(numberOfThreads, dispatch, dispatchQueue, errorHandler);
}
/**
* Subscribe all handlers of the given listener. Any listener is only subscribed once and
* subsequent subscriptions of an already subscribed listener will be silently ignored
*/
public
void subscribe(final Object listener) {
if (listener == null) {
return;
}
// single writer principle using synchronised
subscriptionManager.subscribe(listener);
}
/**
* Immediately remove all registered message handlers (if any) of the given listener.
* <p>
* When this call returns all handlers have effectively been removed and will not
* receive any messages (provided that running publications/iterators in other threads
* have not yet obtained a reference to the listener)
* <p>
* A call to this method passing any object that is not subscribed will not have any effect and is silently ignored.
*/
public
void unsubscribe(final Object listener) {
if (listener == null) {
return;
}
// single writer principle using synchronised
subscriptionManager.unsubscribe(listener);
}
/**
* Synchronously publish a message to all registered listeners.
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
public
<T> void publish(final PoolService<T> pool, final T message) {
syncPublication.publish(pool, message);
}
/**
* Synchronously publish <b>TWO</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
public
<T1, T2> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2,
final T1 message1, final T2 message2) {
syncPublication.publish(pool1, pool2, message1, message2);
}
/**
* Synchronously publish <b>THREE</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* The call returns when all matching subscription handlers of all registered listeners have been notified (invoked) of the message.
*/
public
<T1, T2, T3> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2, final PoolService<T3> pool3,
final T1 message1, final T2 message2, final T3 message3) {
syncPublication.publish(pool1, pool2, pool3, message1, message2, message3);
}
/**
* <i>Asynchronously</i> publish the message to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured to reject
* valid subtypes.
* <p>
* This call returns immediately.
*/
public
<T> void publishAsync(final PoolService<T> pool, final T message) {
asyncPublication.publish(pool, message);
}
/**
* <i>Asynchronously</i> publish <b>TWO</b> messages to all registered listeners (that match the signature).
* <p>
* This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes.
* <p>
* This call returns immediately.
*/
public
<T1, T2> void publishAsync(final PoolService<T1> pool1, final PoolService<T2> pool2,
final T1 message1, final T2 message2) {
asyncPublication.publish(pool1, pool2, message1, message2);
}
/**
* <i>Asynchronously</i> publish <b>THREE</b> messages to all registered listeners (that match the signature).
* <p>
* This includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes.
* <p>
* This call returns immediately.
*/
public
<T1, T2, T3> void publishAsync(final PoolService<T1> pool1, final PoolService<T2> pool2, final PoolService<T3> pool3,
final T1 message1, final T2 message2, final T3 message3) {
asyncPublication.publish(pool1, pool2, pool3, message1, message2, message3);
}
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* <p>
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*/
public
void addErrorHandler(final IPublicationErrorHandler errorHandler) {
this.errorHandler.addErrorHandler(errorHandler);
}
/**
* Check whether any asynchronous message publications are pending to be processed.
* <p>
* Because of the nature of MULTI-THREADED, ASYNCHRONOUS environments, it is ** MORE THAN LIKELY ** this will not be an accurate reflection of the current state.
*
* @return true if there are still message publications waiting to be processed.
*/
public final
boolean hasPendingMessages() {
return asyncPublication.hasPendingMessages();
}
/**
* Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and
* other internally used threads will be shutdown gracefully.
* <p>
* After calling shutdown it is not safe to further use the message bus.
*/
public
void shutdown() {
this.syncPublication.shutdown();
this.asyncPublication.shutdown();
this.subscriptionManager.shutdown();
}
}

View File

@ -1,8 +1,20 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus;
/**
*
*/
public
enum SubscriptionMode {
/**

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony.disruptor;
package dorkbox.messageBus.common;
/**
* @author dorkbox, llc Date: 2/2/15

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016 dorkbox, llc
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,13 +18,13 @@ package dorkbox.messageBus.synchrony;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.LockSupport;
import dorkbox.messageBus.common.MessageType;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.error.PublicationError;
import dorkbox.messageBus.synchrony.disruptor.MessageType;
import dorkbox.util.NamedThreadFactory;
/**
@ -32,17 +32,25 @@ import dorkbox.util.NamedThreadFactory;
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
* This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it
* generates garbage (while the disruptor version does not).
*
* Basically, the disruptor is fast + noGC.
*
* @author dorkbox, llc Date: 2/3/16
*/
public final
class AsyncABQ implements Synchrony {
public
class Async implements Synchrony {
/**
* Always return at least 2 threads
*/
private static
int getMinNumberOfThreads(final int numberOfThreads) {
if (numberOfThreads < 2) {
return 2;
}
return numberOfThreads;
}
private final Dispatch dispatch;
private final BlockingQueue<MessageHolder> dispatchQueue;
private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
private final Collection<Thread> threads;
private final Collection<Boolean> shutdown;
private final ErrorHandler errorHandler;
@ -54,22 +62,28 @@ class AsyncABQ implements Synchrony {
public
AsyncABQ(final int numberOfThreads, final ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
Async(int numberOfThreads, final Dispatch dispatch, final BlockingQueue<MessageHolder> dispatchQueue, final ErrorHandler errorHandler) {
this.dispatch = dispatch;
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
// ALWAYS round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
this.errorHandler = errorHandler;
this.dispatchQueue = dispatchQueue;
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings({"ConstantConditions", "UnnecessaryLocalVariable"})
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ.this.dispatchQueue;
final ErrorHandler errorHandler1 = errorHandler;
final Async outsideThis = Async.this;
while (!AsyncABQ.this.shuttingDown) {
process(IN_QUEUE, errorHandler1);
final Dispatch dispatch = outsideThis.dispatch;
final BlockingQueue<MessageHolder> queue = outsideThis.dispatchQueue;
final ErrorHandler errorHandler = outsideThis.errorHandler;
while (!outsideThis.shuttingDown) {
process(dispatch, queue, errorHandler);
}
synchronized (shutdown) {
@ -91,73 +105,62 @@ class AsyncABQ implements Synchrony {
@SuppressWarnings("Duplicates")
private
void process(final ArrayBlockingQueue<MessageHolder> queue, final ErrorHandler errorHandler) {
MessageHolder event;
int messageType = MessageType.ONE;
Dispatch dispatch;
Object message1 = null;
Object message2 = null;
Object message3 = null;
void process(final Dispatch dispatch, final BlockingQueue<MessageHolder> queue, final ErrorHandler errorHandler) {
try {
event = queue.take();
messageType = event.type;
dispatch = event.dispatch;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
MessageHolder message = queue.take();
int messageType = message.type;
switch (messageType) {
case MessageType.ONE: {
dispatch.publish(message1);
try {
dispatch.publish(message.message1);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1));
}
return;
}
case MessageType.TWO: {
dispatch.publish(message1, message2);
try {
dispatch.publish(message.message1, message.message2);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1, message.message2));
}
return;
}
case MessageType.THREE: {
dispatch.publish(message1, message2, message3);
try {
dispatch.publish(message.message1, message.message2, message.message3);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1, message.message2, message.message3));
}
//noinspection UnnecessaryReturnStatement
return;
}
}
} catch (InterruptedException e) {
if (!this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1));
return;
}
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1, message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement
return;
}
}
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted exception during message dequeue.")
.setCause(e)
.setNoPublishedObject());
}
}
}
@Override
public
void publish(final Dispatch dispatch, final Object message1) {
void publish(final Object message1) {
MessageHolder job = new MessageHolder();
job.type = MessageType.ONE;
job.dispatch = dispatch;
job.message1 = message1;
@ -172,11 +175,10 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
void publish(final Object message1, final Object message2) {
MessageHolder job = new MessageHolder();
job.type = MessageType.TWO;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;
@ -192,11 +194,10 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
void publish(final Object message1, final Object message2, final Object message3) {
MessageHolder job = new MessageHolder();
job.type = MessageType.THREE;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;

View File

@ -1,217 +0,0 @@
/*
* Copyright 2016 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.synchrony.disruptor.EventBusFactory;
import dorkbox.messageBus.synchrony.disruptor.MessageHandler;
import dorkbox.messageBus.synchrony.disruptor.MessageType;
import dorkbox.messageBus.synchrony.disruptor.PublicationExceptionHandler;
import dorkbox.util.NamedThreadFactory;
/**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
*
* @author dorkbox, llc Date: 2/3/16
*/
public final
class AsyncDisruptor implements Synchrony {
private final WorkProcessor<MessageHolder>[] workProcessors;
private final MessageHandler[] handlers;
private final RingBuffer<MessageHolder> ringBuffer;
private final Sequence workSequence;
public
AsyncDisruptor(final int numberOfThreads, final ErrorHandler errorHandler) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new LinkedBlockingQueue<Runnable>(), // also, this doesn't matter
new NamedThreadFactory("MessageBus"));
final PublicationExceptionHandler<MessageHolder> exceptionHandler = new PublicationExceptionHandler<MessageHolder>(errorHandler);
EventBusFactory factory = new EventBusFactory();
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(); // exactly one per thread is used
}
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
final int BUFFER_SIZE = 32;
// final int BUFFER_SIZE = 16;
// final int BUFFER_SIZE = 8;
// final int BUFFER_SIZE = 4;
WaitStrategy consumerWaitStrategy;
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good blocking one
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy(); // best for low latency
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); // good combo
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
final int numWorkers = handlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer,
sequenceBarrier,
handlers[i],
exceptionHandler, workSequence);
}
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence()
.set(cursor);
executor.execute(processor);
}
}
@Override
public
void publish(final Dispatch dispatch, final Object message1) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE;
job.dispatch = dispatch;
job.message1 = message1;
ringBuffer.publish(seq);
}
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.TWO;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;
ringBuffer.publish(seq);
}
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.THREE;
job.dispatch = dispatch;
job.message1 = message1;
job.message3 = message2;
job.message2 = message3;
ringBuffer.publish(seq);
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[workProcessors.length + 1];
for (int i = 0, size = workProcessors.length; i < size; i++) {
sequences[i] = workProcessors[i].getSequence();
}
sequences[sequences.length - 1] = workSequence; // always add the work sequence
return sequences;
}
@Override
public
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
}
@Override
public
void shutdown() {
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016 dorkbox, llc
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,13 +18,17 @@ package dorkbox.messageBus.synchrony;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.LockSupport;
import org.vibur.objectpool.ConcurrentPool;
import org.vibur.objectpool.PoolService;
import org.vibur.objectpool.util.MultithreadConcurrentQueueCollection;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.error.PublicationError;
import dorkbox.messageBus.synchrony.disruptor.MessageType;
import dorkbox.messageBus.common.MessageType;
import dorkbox.util.NamedThreadFactory;
/**
@ -40,12 +44,13 @@ import dorkbox.util.NamedThreadFactory;
* @author dorkbox, llc Date: 2/3/16
*/
public final
class AsyncABQ_noGc implements Synchrony {
class AsyncZeroGC implements SynchronyZeroGC {
private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
private final Dispatch dispatch;
private final BlockingQueue<MessageHolderZeroGC> dispatchQueue;
// have two queues to prevent garbage, So we pull off one queue to add to another queue and when done, we put it back
private final ArrayBlockingQueue<MessageHolder> gcQueue;
// we use a pool to prevent garbage creation.
private final PoolService<MessageHolderZeroGC> pool;
private final Collection<Thread> threads;
private final Collection<Boolean> shutdown;
@ -58,30 +63,31 @@ class AsyncABQ_noGc implements Synchrony {
public
AsyncABQ_noGc(final int numberOfThreads, final ErrorHandler errorHandler) {
AsyncZeroGC(final int numberOfThreads, final Dispatch dispatch, final BlockingQueue<MessageHolderZeroGC> dispatchQueue, final ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
this.dispatch = dispatch;
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
this.gcQueue = new ArrayBlockingQueue<MessageHolder>(1024);
this.dispatchQueue = dispatchQueue;
this.pool = new ConcurrentPool<MessageHolderZeroGC>(new MultithreadConcurrentQueueCollection<>(1024),
new MessageHolderZeroGCClassFactory(),
16, 1024, true);
// this is how we prevent garbage
for (int i = 0; i < 1024; i++) {
gcQueue.add(new MessageHolder());
}
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings({"ConstantConditions", "UnnecessaryLocalVariable"})
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ_noGc.this.dispatchQueue;
final ArrayBlockingQueue<MessageHolder> OUT_QUEUE = AsyncABQ_noGc.this.gcQueue;
AsyncZeroGC outsideThis = AsyncZeroGC.this;
final ErrorHandler errorHandler1 = errorHandler;
final Dispatch dispatch = outsideThis.dispatch;
final BlockingQueue<MessageHolderZeroGC> queue = outsideThis.dispatchQueue;
final PoolService<MessageHolderZeroGC> pool = outsideThis.pool;
final ErrorHandler errorHandler = outsideThis.errorHandler;
while (!AsyncABQ_noGc.this.shuttingDown) {
process(IN_QUEUE, OUT_QUEUE, errorHandler1);
while (!outsideThis.shuttingDown) {
process(dispatch, queue, pool, errorHandler);
}
synchronized (shutdown) {
@ -101,99 +107,101 @@ class AsyncABQ_noGc implements Synchrony {
}
}
@SuppressWarnings("Duplicates")
@SuppressWarnings({"Duplicates", "unchecked"})
private
void process(final ArrayBlockingQueue<MessageHolder> queue,
final ArrayBlockingQueue<MessageHolder> gcQueue, final ErrorHandler errorHandler) {
MessageHolder event;
int messageType = MessageType.ONE;
Dispatch dispatch;
Object message1 = null;
Object message2 = null;
Object message3 = null;
void process(final Dispatch dispatch, final BlockingQueue<MessageHolderZeroGC> queue,
final PoolService<MessageHolderZeroGC> pool, final ErrorHandler errorHandler) {
try {
event = queue.take();
messageType = event.type;
dispatch = event.dispatch;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
gcQueue.put(event);
MessageHolderZeroGC message = queue.take();
int messageType = message.type;
switch (messageType) {
case MessageType.ONE: {
dispatch.publish(message1);
try {
dispatch.publish(message.message1);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1));
} finally {
pool.restore(message);
message.pool1.restore(message.message1);
}
return;
}
case MessageType.TWO: {
dispatch.publish(message1, message2);
try {
dispatch.publish(message.message1, message.message2);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1, message.message2));
} finally {
pool.restore(message);
message.pool1.restore(message.message1);
message.pool2.restore(message.message2);
}
return;
}
case MessageType.THREE: {
dispatch.publish(message1, message2, message3);
try {
dispatch.publish(message.message1, message.message2, message.message3);
} catch (Exception e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Exception during message dequeue.")
.setCause(e)
.setPublishedObject(message.message1, message.message2, message.message3));
} finally {
pool.restore(message);
message.pool1.restore(message.message1);
message.pool2.restore(message.message2);
message.pool3.restore(message.message3);
}
//noinspection UnnecessaryReturnStatement
return;
}
}
} catch (InterruptedException e) {
if (!this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1));
return;
}
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1, message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement
return;
}
}
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted exception during message dequeue.")
.setCause(e)
.setNoPublishedObject());
}
}
}
@Override
public
void publish(final Dispatch dispatch, final Object message1) {
<T> void publish(final PoolService<T> pool, final T message) {
try {
MessageHolder job = gcQueue.take();
MessageHolderZeroGC job = this.pool.take();
job.type = MessageType.ONE;
job.dispatch = dispatch;
job.message1 = message1;
job.pool1 = pool;
job.message1 = message;
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1));
.setPublishedObject(message));
}
}
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
<T1, T2> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2,
final T1 message1, final T2 message2) {
try {
MessageHolder job = gcQueue.take();
MessageHolderZeroGC job = pool.take();
job.type = MessageType.TWO;
job.dispatch = dispatch;
job.pool1 = pool1;
job.pool2 = pool2;
job.message1 = message1;
job.message2 = message2;
@ -208,12 +216,16 @@ class AsyncABQ_noGc implements Synchrony {
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
<T1, T2, T3> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2, final PoolService<T3> pool3,
final T1 message1, final T2 message2, final T3 message3) {
try {
MessageHolder job = gcQueue.take();
MessageHolderZeroGC job = pool.take();
job.type = MessageType.THREE;
job.dispatch = dispatch;
job.pool1 = pool1;
job.pool2 = pool2;
job.pool3 = pool3;
job.message1 = message1;
job.message2 = message2;

View File

@ -15,8 +15,7 @@
*/
package dorkbox.messageBus.synchrony;
import dorkbox.messageBus.dispatch.Dispatch;
import dorkbox.messageBus.synchrony.disruptor.MessageType;
import dorkbox.messageBus.common.MessageType;
/**
* @author dorkbox, llc Date: 2/2/15
@ -24,7 +23,6 @@ import dorkbox.messageBus.synchrony.disruptor.MessageType;
public
class MessageHolder {
public int type = MessageType.ONE;
public Dispatch dispatch = null;
public Object message1 = null;
public Object message2 = null;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2015 dorkbox, llc
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,24 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony.disruptor;
package dorkbox.messageBus.synchrony;
import com.lmax.disruptor.EventFactory;
import org.vibur.objectpool.PoolService;
import dorkbox.messageBus.synchrony.MessageHolder;
import dorkbox.messageBus.common.MessageType;
/**
* @author dorkbox, llc
* Date: 2/2/15
* @author dorkbox, llc Date: 2/2/19
*/
public class EventBusFactory implements EventFactory<MessageHolder> {
public
class MessageHolderZeroGC {
public int type = MessageType.ONE;
public EventBusFactory() {
}
public PoolService pool1 = null;
public PoolService pool2 = null;
public PoolService pool3 = null;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
@Override
public
MessageHolder newInstance() {
return new MessageHolder();
}
MessageHolderZeroGC() {}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony;
import org.vibur.objectpool.PoolObjectFactory;
import dorkbox.messageBus.common.MessageType;
/**
* Factory for managing the stat of 'MessageHolder' for use by the Zero GC Async Queue.
*/
public
class MessageHolderZeroGCClassFactory implements PoolObjectFactory<MessageHolderZeroGC> {
/**
* Creates a new object for the calling object pool. This object is presumed to be ready (and valid)
* for immediate use. Should <b>never</b> return {@code null}.
*
* <p>This method will be called by the constructors of {@link ConcurrentPool}, and by any of its
* {@code take...} methods if they were able to obtain a permit from the counting {@code Semaphore}
* guarding the pool, but there was no ready and valid object in the pool. I.e., this is the case when
* a new object is created lazily in the pool upon request.
*
* @return a new object for this object pool
*/
@Override
public
MessageHolderZeroGC create() {
return new MessageHolderZeroGC();
}
/**
* A validation/activation hook which will be called by the {@code take...} methods of
* {@link ConcurrentPool} when an object from the object pool is requested by the application.
* This is an optional operation which concrete implementation may simply always return {@code true}.
*
* <p>If there is a particular activation or validation which needs to be done
* for the taken from the pool object, this is the ideal place where it can be done.
*
* @see #readyToRestore
*
* @param obj an object which is taken from the object pool and which is to be given
* to the calling application
* @return {@code true} if the validation/activation is successful, {@code false} otherwise
*/
@Override
public
boolean readyToTake(final MessageHolderZeroGC obj) {
return true;
}
/**
* A validation/passivation hook which will be called by the {@code restore} methods of
* {@link ConcurrentPool} when an object taken before that from the object pool is about to be
* restored (returned back) to the pool. This is an optional operation which concrete implementation
* may simply always return {@code true}.
*
* <p>If there is a particular passivation or validation which needs to be done
* for the restored to the pool object, this is the ideal place where it can be done.
*
* @see #readyToTake
*
* @param obj an object which has been taken before that from this object pool and which is now
* to be restored to the pool
* @return {@code true} if the validation/passivation is successful, {@code false} otherwise
*/
@Override
public
boolean readyToRestore(final MessageHolderZeroGC obj) {
return true;
}
/**
* A method which will be called when an object from the object pool needs to be destroyed,
* which is when the {@link #readyToTake} or {@link #readyToRestore} methods have returned
* {@code false}, or when the pool is shrinking its size (via calling {@code reduceCreatedBy/To}),
* or when the pool is terminating. The simplest implementation of this method may simply
* do nothing, however if there are any allocated resources associated with the to-be-destroyed
* object, like network connections or similar, this is the ideal place where they can be
* de-allocated.
*
* @param obj an object from the pool which needs to be destroyed
*/
@Override
public
void destroy(final MessageHolderZeroGC obj) {
obj.type = MessageType.ONE;
obj.pool1 = null;
obj.pool2 = null;
obj.pool3 = null;
obj.message1 = null;
obj.message2 = null;
obj.message3 = null;
}
}

View File

@ -25,25 +25,28 @@ import dorkbox.messageBus.dispatch.Dispatch;
public final
class Sync implements Synchrony {
private final Dispatch dispatch;
public
Sync() {
Sync(final Dispatch dispatch) {
this.dispatch = dispatch;
}
@Override
public
void publish(final Dispatch dispatch, final Object message1) {
void publish(final Object message1) {
dispatch.publish(message1);
}
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
void publish(final Object message1, final Object message2) {
dispatch.publish(message1, message2);
}
@Override
public
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
void publish(final Object message1, final Object message2, final Object message3) {
dispatch.publish(message1, message2, message3);
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony;
import org.vibur.objectpool.PoolService;
import dorkbox.messageBus.dispatch.Dispatch;
/**
* @author dorkbox, llc Date: 2/2/15
*/
@SuppressWarnings("Duplicates")
public final
class SyncZeroGC implements SynchronyZeroGC {
private final Dispatch dispatch;
public
SyncZeroGC(final Dispatch dispatch) {
this.dispatch = dispatch;
}
@Override
public
<T> void publish(final PoolService<T> pool, final T message) {
try {
dispatch.publish(message);
} finally {
pool.restore(message);
}
}
@Override
public
<T1, T2> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2,
final T1 message1, final T2 message2) {
try {
dispatch.publish(message1, message2);
} finally {
pool1.restore(message1);
pool2.restore(message2);
}
}
@Override
public
<T1, T2, T3> void publish(final PoolService<T1> pool1, final PoolService<T2> pool2, final PoolService<T3> pool3,
final T1 message1, final T2 message2, final T3 message3) {
try {
dispatch.publish(message1, message2, message3);
} finally {
pool1.restore(message1);
pool2.restore(message2);
pool3.restore(message3);
}
}
@Override
public
void shutdown() {
}
@Override
public
boolean hasPendingMessages() {
return false;
}
}

View File

@ -15,16 +15,14 @@
*/
package dorkbox.messageBus.synchrony;
import dorkbox.messageBus.dispatch.Dispatch;
/**
* @author dorkbox, llc Date: 2/3/16
*/
public
interface Synchrony {
void publish(Dispatch dispatch, Object message1);
void publish(Dispatch dispatch, Object message1, Object message2);
void publish(Dispatch dispatch, Object message1, Object message2, Object message3);
void publish(Object message1);
void publish(Object message1, Object message2);
void publish(Object message1, Object message2, Object message3);
void shutdown();
boolean hasPendingMessages();

View File

@ -0,0 +1,36 @@
/*
* Copyright 2019 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony;
import org.vibur.objectpool.PoolService;
/**
* @author dorkbox, llc Date: 2/3/16
*/
public
interface SynchronyZeroGC {
<T> void publish(PoolService<T> pool, T message1);
<T1, T2> void publish(PoolService<T1> pool1, PoolService<T2> pool2,
T1 message1, T2 message2);
<T1, T2, T3> void publish(PoolService<T1> pool1, PoolService<T2> pool2, PoolService<T3> pool3,
T1 message1, T2 message2, T3 message3);
void shutdown();
boolean hasPendingMessages();
}

View File

@ -1,74 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony.disruptor;
import java.util.concurrent.atomic.AtomicBoolean;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import dorkbox.messageBus.synchrony.MessageHolder;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public
MessageHandler() {
}
@Override
public
void onEvent(final MessageHolder event) throws Exception {
final int messageType = event.type;
switch (messageType) {
case MessageType.ONE: {
event.dispatch.publish(event.message1);
return;
}
case MessageType.TWO: {
event.dispatch.publish(event.message1, event.message2);
return;
}
case MessageType.THREE: {
event.dispatch.publish(event.message1, event.message2, event.message3);
//noinspection UnnecessaryReturnStatement
return;
}
}
}
@Override
public
void onStart() {
}
@Override
public synchronized
void onShutdown() {
shutdown.set(true);
}
public
boolean isShutdown() {
return shutdown.get();
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright 2016 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messageBus.synchrony.disruptor;
import com.lmax.disruptor.ExceptionHandler;
import dorkbox.messageBus.error.ErrorHandler;
import dorkbox.messageBus.error.PublicationError;
/**
* @author dorkbox, llc Date: 2/3/16
*/
public final class PublicationExceptionHandler<T> implements ExceptionHandler<T> {
private final ErrorHandler errorHandler;
public PublicationExceptionHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void handleEventException(final Throwable e, final long sequence, final T event) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")")
.setCause(e));
}
@Override
public void handleOnStartException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error starting the disruptor")
.setCause(e));
}
@Override
public void handleOnShutdownException(final Throwable e) {
this.errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error stopping the disruptor")
.setCause(e));
}
}