Added ability to cancel delivery of messages

This commit is contained in:
nathan 2017-09-17 23:19:35 +02:00
parent c92c98857b
commit 6ce0af57d3
9 changed files with 252 additions and 146 deletions

View File

@ -35,6 +35,9 @@ Table of contents:
|`@Listener`|Can be used to customize listener wide configuration like the used reference type|
|`@Synchronized`|Specifies that the handler/method will be accessed in a `synchronized` block|
> Canceling message delivery
Messages can be canceled (but only in the same thread they are on), and any further deliveries for that message will be cancelled; subsequent subscribers wont receive the message. Call via `MessageBus.cancel()`
> Delivers everything
Messages do not need to implement any interface and can be of any type. It is possible though to define an upper bound of the message type using generics. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a `DeadMessage` object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage.

View File

@ -16,6 +16,7 @@
package dorkbox.messagebus;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.dispatch.DispatchCancel;
import dorkbox.messagebus.dispatch.DispatchExact;
import dorkbox.messagebus.dispatch.DispatchExactWithSuperTypes;
import dorkbox.messagebus.error.ErrorHandler;
@ -111,6 +112,16 @@ class MessageBus implements IMessageBus {
return "1.19";
}
/**
* Cancels the publication of the message (or messages). Only applicable for the currently running thread. No more subscribers for
* this message will be called.
*/
public static
void cancel() {
throw new DispatchCancel();
}
private final ErrorHandler errorHandler;
private final SubscriptionManager subscriptionManager;
@ -338,7 +349,6 @@ class MessageBus implements IMessageBus {
return asyncPublication.hasPendingMessages();
}
/**
* Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and
* other internally used threads will be shutdown gracefully.

View File

@ -0,0 +1,27 @@
/*
* Copyright 2017 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.dispatch;
/**
* Cancels the publication of the message (or messages). Only applicable for the currently running thread. No more subscribers for
* this message will be called.
*/
public
class DispatchCancel extends RuntimeException {
public
DispatchCancel() {
}
}

View File

@ -54,26 +54,30 @@ class DispatchExact implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for this specific message
}
}
@ -92,26 +96,30 @@ class DispatchExact implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for these specific messages
}
}
@ -131,26 +139,30 @@ class DispatchExact implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for these specific messages
}
}
}

View File

@ -52,35 +52,39 @@ class DispatchExactWithSuperTypes implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for this specific message
}
}
@ -100,35 +104,39 @@ class DispatchExactWithSuperTypes implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for these specific messages
}
}
@ -149,35 +157,39 @@ class DispatchExactWithSuperTypes implements Dispatch {
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
try {
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
} catch (DispatchCancel ignored) {
// we wanted to cancel the dispatch for these specific messages
}
}
}

View File

@ -37,15 +37,17 @@
*/
package dorkbox.messagebus.subscription.asm;
import java.lang.reflect.Method;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.dispatch.DispatchCancel;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription;
import java.lang.reflect.Method;
/**
* A subscription is a container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
@ -109,6 +111,9 @@ class SubscriptionAsmStrong extends Subscription<Object> {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -135,6 +140,9 @@ class SubscriptionAsmStrong extends Subscription<Object> {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -161,6 +169,9 @@ class SubscriptionAsmStrong extends Subscription<Object> {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)

View File

@ -37,16 +37,18 @@
*/
package dorkbox.messagebus.subscription.asm;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.dispatch.DispatchCancel;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
/**
* A subscription is a container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
@ -139,6 +141,9 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -175,6 +180,9 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -211,6 +219,9 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)

View File

@ -37,14 +37,15 @@
*/
package dorkbox.messagebus.subscription.reflection;
import java.lang.reflect.Method;
import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.dispatch.DispatchCancel;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription;
import java.lang.reflect.Method;
/**
* A subscription is a container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
@ -101,6 +102,9 @@ class SubscriptionReflectionStrong extends Subscription<Object> {
try {
invocation.invoke(listener, method, message);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -126,6 +130,9 @@ class SubscriptionReflectionStrong extends Subscription<Object> {
try {
invocation.invoke(listener, method, message1, message2);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -151,6 +158,9 @@ class SubscriptionReflectionStrong extends Subscription<Object> {
try {
invocation.invoke(listener, method, message1, message2, message3);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)

View File

@ -37,15 +37,16 @@
*/
package dorkbox.messagebus.subscription.reflection;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.dispatch.DispatchCancel;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
/**
* A subscription is a container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
@ -131,6 +132,9 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, method, message);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -166,6 +170,9 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, method, message1, message2);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
@ -201,6 +208,9 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
try {
invocation.invoke(listener, method, message1, message2, message3);
} catch (DispatchCancel e) {
// we want to cancel the dispatch for this specific message
throw e;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)