From 6ce0af57d302ec5ffd4fd4e635a89cdb0ddf51f2 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 17 Sep 2017 23:19:35 +0200 Subject: [PATCH] Added ability to cancel delivery of messages --- README.md | 3 + src/dorkbox/messagebus/MessageBus.java | 12 +- .../messagebus/dispatch/DispatchCancel.java | 27 +++ .../messagebus/dispatch/DispatchExact.java | 120 ++++++------ .../dispatch/DispatchExactWithSuperTypes.java | 174 ++++++++++-------- .../asm/SubscriptionAsmStrong.java | 15 +- .../subscription/asm/SubscriptionAsmWeak.java | 17 +- .../SubscriptionReflectionStrong.java | 14 +- .../SubscriptionReflectionWeak.java | 16 +- 9 files changed, 252 insertions(+), 146 deletions(-) create mode 100644 src/dorkbox/messagebus/dispatch/DispatchCancel.java diff --git a/README.md b/README.md index 8481049..c080661 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,9 @@ Table of contents: |`@Listener`|Can be used to customize listener wide configuration like the used reference type| |`@Synchronized`|Specifies that the handler/method will be accessed in a `synchronized` block| +> Canceling message delivery +Messages can be canceled (but only in the same thread they are on), and any further deliveries for that message will be cancelled; subsequent subscribers won’t receive the message. Call via `MessageBus.cancel()` + > Delivers everything Messages do not need to implement any interface and can be of any type. It is possible though to define an upper bound of the message type using generics. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a `DeadMessage` object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage. diff --git a/src/dorkbox/messagebus/MessageBus.java b/src/dorkbox/messagebus/MessageBus.java index 648546c..453f0a7 100644 --- a/src/dorkbox/messagebus/MessageBus.java +++ b/src/dorkbox/messagebus/MessageBus.java @@ -16,6 +16,7 @@ package dorkbox.messagebus; import dorkbox.messagebus.dispatch.Dispatch; +import dorkbox.messagebus.dispatch.DispatchCancel; import dorkbox.messagebus.dispatch.DispatchExact; import dorkbox.messagebus.dispatch.DispatchExactWithSuperTypes; import dorkbox.messagebus.error.ErrorHandler; @@ -111,6 +112,16 @@ class MessageBus implements IMessageBus { return "1.19"; } + /** + * Cancels the publication of the message (or messages). Only applicable for the currently running thread. No more subscribers for + * this message will be called. + */ + public static + void cancel() { + throw new DispatchCancel(); + } + + private final ErrorHandler errorHandler; private final SubscriptionManager subscriptionManager; @@ -338,7 +349,6 @@ class MessageBus implements IMessageBus { return asyncPublication.hasPendingMessages(); } - /** * Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and * other internally used threads will be shutdown gracefully. diff --git a/src/dorkbox/messagebus/dispatch/DispatchCancel.java b/src/dorkbox/messagebus/dispatch/DispatchCancel.java new file mode 100644 index 0000000..f988e57 --- /dev/null +++ b/src/dorkbox/messagebus/dispatch/DispatchCancel.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.messagebus.dispatch; + +/** + * Cancels the publication of the message (or messages). Only applicable for the currently running thread. No more subscribers for + * this message will be called. + */ +public +class DispatchCancel extends RuntimeException { + public + DispatchCancel() { + } +} diff --git a/src/dorkbox/messagebus/dispatch/DispatchExact.java b/src/dorkbox/messagebus/dispatch/DispatchExact.java index 303c3d5..e4522ff 100644 --- a/src/dorkbox/messagebus/dispatch/DispatchExact.java +++ b/src/dorkbox/messagebus/dispatch/DispatchExact.java @@ -54,26 +54,30 @@ class DispatchExact implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); } } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for this specific message } } @@ -92,26 +96,30 @@ class DispatchExact implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); } } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for these specific messages } } @@ -131,26 +139,30 @@ class DispatchExact implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2, message3); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); } } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for these specific messages } } } diff --git a/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java b/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java index 954ecc5..7522d18 100644 --- a/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java +++ b/src/dorkbox/messagebus/dispatch/DispatchExactWithSuperTypes.java @@ -52,35 +52,39 @@ class DispatchExactWithSuperTypes implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1); - } - } - - if ((subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); } } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for this specific message } } @@ -100,35 +104,39 @@ class DispatchExactWithSuperTypes implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2); - } - } - - if ((subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); } } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for these specific messages } } @@ -149,35 +157,39 @@ class DispatchExactWithSuperTypes implements Dispatch { int subLength; boolean hasSubs = false; - // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. - if (subscriptions != null && (subLength = subscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = subscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2, message3); - } - } - - if ((subLength = superSubscriptions.length) > 0) { - // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. - // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered - for (int i = 0; i < subLength; i++) { - sub = superSubscriptions[i]; - hasSubs |= sub.publish(errorHandler, message1, message2, message3); - } - } - - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(errorHandler, deadMessage); + try { + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); } } + + if ((subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } + } + } catch (DispatchCancel ignored) { + // we wanted to cancel the dispatch for these specific messages } } } diff --git a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java index 52d0465..f0cc652 100644 --- a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java +++ b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java @@ -37,15 +37,17 @@ */ package dorkbox.messagebus.subscription.asm; +import java.lang.reflect.Method; + import com.esotericsoftware.reflectasm.MethodAccess; + import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.dispatch.DispatchCancel; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; -import java.lang.reflect.Method; - /** * A subscription is a container that manages exactly one message handler of all registered * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message @@ -109,6 +111,9 @@ class SubscriptionAsmStrong extends Subscription { try { invocation.invoke(listener, handler, handleIndex, message); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -135,6 +140,9 @@ class SubscriptionAsmStrong extends Subscription { try { invocation.invoke(listener, handler, handleIndex, message1, message2); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -161,6 +169,9 @@ class SubscriptionAsmStrong extends Subscription { try { invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) diff --git a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java index fe89266..c1284ab 100644 --- a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java +++ b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java @@ -37,16 +37,18 @@ */ package dorkbox.messagebus.subscription.asm; +import java.lang.ref.WeakReference; +import java.lang.reflect.Method; + import com.esotericsoftware.reflectasm.MethodAccess; + import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.dispatch.DispatchCancel; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; -import java.lang.ref.WeakReference; -import java.lang.reflect.Method; - /** * A subscription is a container that manages exactly one message handler of all registered * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message @@ -139,6 +141,9 @@ class SubscriptionAsmWeak extends Subscription> { try { invocation.invoke(listener, handler, handleIndex, message); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -175,6 +180,9 @@ class SubscriptionAsmWeak extends Subscription> { try { invocation.invoke(listener, handler, handleIndex, message1, message2); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -211,6 +219,9 @@ class SubscriptionAsmWeak extends Subscription> { try { invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) diff --git a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java index 89d5638..b1e242c 100644 --- a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java +++ b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java @@ -37,14 +37,15 @@ */ package dorkbox.messagebus.subscription.reflection; +import java.lang.reflect.Method; + import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.dispatch.DispatchCancel; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; -import java.lang.reflect.Method; - /** * A subscription is a container that manages exactly one message handler of all registered * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message @@ -101,6 +102,9 @@ class SubscriptionReflectionStrong extends Subscription { try { invocation.invoke(listener, method, message); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -126,6 +130,9 @@ class SubscriptionReflectionStrong extends Subscription { try { invocation.invoke(listener, method, message1, message2); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -151,6 +158,9 @@ class SubscriptionReflectionStrong extends Subscription { try { invocation.invoke(listener, method, message1, message2, message3); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) diff --git a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java index 4333ca9..fe77597 100644 --- a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java +++ b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java @@ -37,15 +37,16 @@ */ package dorkbox.messagebus.subscription.reflection; +import java.lang.ref.WeakReference; +import java.lang.reflect.Method; + import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.dispatch.DispatchCancel; import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; -import java.lang.ref.WeakReference; -import java.lang.reflect.Method; - /** * A subscription is a container that manages exactly one message handler of all registered * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message @@ -131,6 +132,9 @@ class SubscriptionReflectionWeak extends Subscription> { try { invocation.invoke(listener, method, message); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -166,6 +170,9 @@ class SubscriptionReflectionWeak extends Subscription> { try { invocation.invoke(listener, method, message1, message2); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e) @@ -201,6 +208,9 @@ class SubscriptionReflectionWeak extends Subscription> { try { invocation.invoke(listener, method, message1, message2, message3); + } catch (DispatchCancel e) { + // we want to cancel the dispatch for this specific message + throw e; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") .setCause(e)