From 49933e9219351c156e51b9d4e83650b0cc85bba1 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 7 Feb 2016 19:32:24 +0100 Subject: [PATCH] Fixed issues with dead message publication - we now detect if there are listeners are actually subscribed, and publish dead messages when there are none detected. Primary subscriptions/superSubscriptions are now 'gotten' by the calling thread (DeadMessage subscriptions are 'gotten' lazily. Error handling is now improved - a single error in a collection of "same type" subscriptions will not cause all of them to abort (only the currently called method will). --- src/dorkbox/messagebus/MessageBus.java | 4 +- .../publication/PublisherExact.java | 93 +++--------- .../PublisherExactWithSuperTypes.java | 133 +++-------------- .../messagebus/subscription/Subscription.java | 9 +- .../asm/SubscriptionAsmStrong.java | 51 +++++-- .../subscription/asm/SubscriptionAsmWeak.java | 52 +++++-- .../SubscriptionReflectionStrong.java | 51 +++++-- .../SubscriptionReflectionWeak.java | 52 +++++-- .../messagebus/synchrony/AsyncABQ.java | 71 ++++++--- .../messagebus/synchrony/AsyncABQ_noGc.java | 94 ++++++++---- .../messagebus/synchrony/AsyncDisruptor.java | 22 ++- .../messagebus/synchrony/MessageHolder.java | 3 +- src/dorkbox/messagebus/synchrony/Sync.java | 139 ++++++++++++++++-- .../messagebus/synchrony/Synchrony.java | 6 +- .../synchrony/disruptor/MessageHandler.java | 59 ++------ 15 files changed, 482 insertions(+), 357 deletions(-) diff --git a/src/dorkbox/messagebus/MessageBus.java b/src/dorkbox/messagebus/MessageBus.java index d9664d9..fbdb335 100644 --- a/src/dorkbox/messagebus/MessageBus.java +++ b/src/dorkbox/messagebus/MessageBus.java @@ -156,7 +156,7 @@ class MessageBus implements IMessageBus { switch (publishMode) { case Exact: - publisher = new PublisherExact(errorHandler, subscriptionManager); + publisher = new PublisherExact(subscriptionManager); break; case ExactWithSuperTypes: @@ -165,7 +165,7 @@ class MessageBus implements IMessageBus { break; } - syncPublication = new Sync(); + syncPublication = new Sync(errorHandler, subscriptionManager); // the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue if (useDisruptorForAsyncPublish) { diff --git a/src/dorkbox/messagebus/publication/PublisherExact.java b/src/dorkbox/messagebus/publication/PublisherExact.java index 4c14ea5..3d8fc74 100644 --- a/src/dorkbox/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/messagebus/publication/PublisherExact.java @@ -15,109 +15,52 @@ */ package dorkbox.messagebus.publication; -import dorkbox.messagebus.error.DeadMessage; -import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.synchrony.Synchrony; +/** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + */ @SuppressWarnings("Duplicates") public class PublisherExact implements Publisher { - private final ErrorHandler errorHandler; private final SubscriptionManager subManager; public - PublisherExact(final ErrorHandler errorHandler, final SubscriptionManager subManager) { - this.errorHandler = errorHandler; + PublisherExact(final SubscriptionManager subManager) { this.subManager = subManager; } @Override public void publish(final Synchrony synchrony, final Object message1) { - try { - final Class messageClass = message1.getClass(); + final Class messageClass = message1.getClass(); - final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null - - // Run subscriptions - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - synchrony.publish(subscriptions, message1); - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1)); - } + final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null + synchrony.publish(subscriptions, null, message1); } @Override public void publish(final Synchrony synchrony, final Object message1, final Object message2) { - try { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null - - // Run subscriptions - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - synchrony.publish(subscriptions, message1, message2); - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2)); - } + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null + synchrony.publish(subscriptions, null, message1, message2); } @Override public void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { - try { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - final Class messageClass3 = message3.getClass(); + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + final Class messageClass3 = message3.getClass(); - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null - - // Run subscriptions - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - synchrony.publish(subscriptions, message1, message2, message3); - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null + synchrony.publish(subscriptions, null, message1, message2, message3); } } diff --git a/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java index 5da3117..c4be025 100644 --- a/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/messagebus/publication/PublisherExactWithSuperTypes.java @@ -15,9 +15,7 @@ */ package dorkbox.messagebus.publication; -import dorkbox.messagebus.error.DeadMessage; import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.synchrony.Synchrony; @@ -38,132 +36,37 @@ class PublisherExactWithSuperTypes implements Publisher { @Override public void publish(final Synchrony synchrony, final Object message1) { - try { - final SubscriptionManager subManager = this.subManager; - final Class message1Class = message1.getClass(); - boolean hasSubs = false; + final SubscriptionManager subManager = this.subManager; + final Class messageClass1 = message1.getClass(); + final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null - // Run subscriptions - final Subscription[] subscriptions = subManager.getSubs(message1Class); // can return null - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(subscriptions, message1); - } - - // Run superSubscriptions - final Subscription[] superSubscriptions = subManager.getSuperSubs(message1Class); // NOT return null - if (superSubscriptions.length > 0) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(superSubscriptions, message1); - } - - - // Run dead message subscriptions - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") - .setCause(e) - .setPublishedObject(message1)); - } + synchrony.publish(subscriptions, superSubscriptions, message1); } @Override public void publish(final Synchrony synchrony, final Object message1, final Object message2) { - try { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - boolean hasSubs = false; + final SubscriptionManager subManager = this.subManager; + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); - - // Run subscriptions - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(subscriptions, message1, message2); - } - - - // Run superSubscriptions - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null - if (superSubscriptions.length > 0) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(superSubscriptions, message1, message2); - } - - - // Run dead message subscriptions - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2)); - } + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null + synchrony.publish(subscriptions, superSubscriptions, message1, message2); } @Override public void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { - try { - final Class messageClass1 = message1.getClass(); - final Class messageClass2 = message2.getClass(); - final Class messageClass3 = message3.getClass(); - boolean hasSubs = false; + final SubscriptionManager subManager = this.subManager; + final Class messageClass1 = message1.getClass(); + final Class messageClass2 = message2.getClass(); + final Class messageClass3 = message3.getClass(); - - // Run subscriptions - final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null - if (subscriptions != null) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(subscriptions, message1, message2, message3); - } - - - // Run superSubscriptions - final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null - if (superSubscriptions.length > 0) { - // this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async - // nature of publication - hasSubs = true; - synchrony.publish(superSubscriptions, message1, message2, message3); - } - - - // Run dead message subscriptions - if (!hasSubs) { - // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null - if (deadSubscriptions != null) { - synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3)); - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } + final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null + final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null + synchrony.publish(subscriptions, superSubscriptions, message1, message2, message3); } } diff --git a/src/dorkbox/messagebus/subscription/Subscription.java b/src/dorkbox/messagebus/subscription/Subscription.java index 5738fef..9958361 100644 --- a/src/dorkbox/messagebus/subscription/Subscription.java +++ b/src/dorkbox/messagebus/subscription/Subscription.java @@ -17,6 +17,7 @@ package dorkbox.messagebus.subscription; import com.esotericsoftware.kryo.util.IdentityMap; import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.error.ErrorHandler; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -84,7 +85,7 @@ class Subscription { } public abstract - Entry createEntry(final Object listener, final Entry head); + Entry createEntry(final Object listener, final Entry head); /** * single writer principle! @@ -151,13 +152,13 @@ class Subscription { } public abstract - void publish(final Object message) throws Throwable; + boolean publish(final ErrorHandler errorHandler, final Object message); public abstract - void publish(final Object message1, final Object message2) throws Throwable; + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2); public abstract - void publish(final Object message1, final Object message2, final Object message3) throws Throwable; + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3); @Override diff --git a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java index 93d2385..52d0465 100644 --- a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java +++ b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmStrong.java @@ -39,6 +39,8 @@ package dorkbox.messagebus.subscription.asm; import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; @@ -87,58 +89,85 @@ class SubscriptionAsmStrong extends Subscription { @Override public - Entry createEntry(final Object listener, final Entry head) { - return new Entry(listener, (Entry)head); + Entry createEntry(final Object listener, final Entry head) { + return new Entry(listener, head); } @Override public - void publish(final Object message) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, handler, handleIndex, message); + try { + invocation.invoke(listener, handler, handleIndex, message); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message)); + } } + + return head != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2) throws Throwable { + boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, handler, handleIndex, message1, message2); + try { + invocation.invoke(listener, handler, handleIndex, message1, message2); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } + + return head != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2, final Object message3) throws Throwable { + boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2, final Object message3) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + try { + invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } + + return head != null; // true if we have something to publish to, otherwise false } } diff --git a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java index 858e716..fe89266 100644 --- a/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java +++ b/src/dorkbox/messagebus/subscription/asm/SubscriptionAsmWeak.java @@ -39,6 +39,8 @@ package dorkbox.messagebus.subscription.asm; import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; @@ -90,7 +92,7 @@ class SubscriptionAsmWeak extends Subscription> { @Override public - Entry> createEntry(final Object listener, final Entry head) { + Entry> createEntry(final Object listener, final Entry> head) { return new Entry>(new WeakReference(listener), head); } @@ -114,12 +116,13 @@ class SubscriptionAsmWeak extends Subscription> { @Override public - void publish(final Object message) throws Throwable { + boolean publish(final ErrorHandler errorHandler,final Object message) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -134,18 +137,28 @@ class SubscriptionAsmWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, handler, handleIndex, message); + try { + invocation.invoke(listener, handler, handleIndex, message); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2) throws Throwable { + boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -160,18 +173,28 @@ class SubscriptionAsmWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, handler, handleIndex, message1, message2); + try { + invocation.invoke(listener, handler, handleIndex, message1, message2); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2, final Object message3) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) { final MethodAccess handler = this.handlerAccess; final int handleIndex = this.methodIndex; final AsmInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -186,8 +209,17 @@ class SubscriptionAsmWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + try { + invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @SuppressWarnings("unchecked") diff --git a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java index 3d4db10..89d5638 100644 --- a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java +++ b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionStrong.java @@ -38,6 +38,8 @@ package dorkbox.messagebus.subscription.reflection; import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; @@ -80,55 +82,82 @@ class SubscriptionReflectionStrong extends Subscription { @Override public - Entry createEntry(final Object listener, final Entry head) { - return new Entry(listener, (Entry)head); + Entry createEntry(final Object listener, final Entry head) { + return new Entry(listener, head); } @Override public - void publish(final Object message) throws Throwable { + boolean publish(final ErrorHandler errorHandler,final Object message) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, method, message); + try { + invocation.invoke(listener, method, message); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message)); + } } + + return head != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, method, message1, message2); + try { + invocation.invoke(listener, method, message1, message2); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } + + return head != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2, final Object message3) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry current = headREF.get(this); + Entry head = headREF.get(this); + Entry current = head; Object listener; while (current != null) { listener = current.getValue(); current = current.next(); - invocation.invoke(listener, method, message1, message2, message3); + try { + invocation.invoke(listener, method, message1, message2, message3); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } + + return head != null; // true if we have something to publish to, otherwise false } } diff --git a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java index 78df298..4333ca9 100644 --- a/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java +++ b/src/dorkbox/messagebus/subscription/reflection/SubscriptionReflectionWeak.java @@ -38,6 +38,8 @@ package dorkbox.messagebus.subscription.reflection; import dorkbox.messagebus.common.MessageHandler; +import dorkbox.messagebus.error.ErrorHandler; +import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Subscription; @@ -83,7 +85,7 @@ class SubscriptionReflectionWeak extends Subscription> { @Override public - Entry> createEntry(final Object listener, final Entry head) { + Entry> createEntry(final Object listener, final Entry> head) { return new Entry>(new WeakReference(listener), head); } @@ -107,11 +109,12 @@ class SubscriptionReflectionWeak extends Subscription> { @Override public - void publish(final Object message) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -126,17 +129,27 @@ class SubscriptionReflectionWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, method, message); + try { + invocation.invoke(listener, method, message); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -151,17 +164,27 @@ class SubscriptionReflectionWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, method, message1, message2); + try { + invocation.invoke(listener, method, message1, message2); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @Override public - void publish(final Object message1, final Object message2, final Object message3) throws Throwable { + boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) { final Method method = this.method; final ReflectionInvocation invocation = this.invocation; - Entry> current = cast(headREF.get(this)); + Entry> head = cast(headREF.get(this)); + Entry> current = head; Object listener; while (current != null) { listener = current.getValue().get(); @@ -176,8 +199,17 @@ class SubscriptionReflectionWeak extends Subscription> { } current = current.next(); - invocation.invoke(listener, method, message1, message2, message3); + try { + invocation.invoke(listener, method, message1, message2, message3); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } + + // because the value can be GC'd at any time, this is the best guess possible + return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false } @SuppressWarnings("unchecked") diff --git a/src/dorkbox/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/messagebus/synchrony/AsyncABQ.java index d92add1..e581e7b 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncABQ.java +++ b/src/dorkbox/messagebus/synchrony/AsyncABQ.java @@ -28,6 +28,10 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.locks.LockSupport; /** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + * * This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it * generates garbage (while the disruptor version does not). * @@ -41,6 +45,7 @@ class AsyncABQ implements Synchrony { private final ArrayBlockingQueue dispatchQueue; private final Collection threads; private final Collection shutdown; + private final ErrorHandler errorHandler; /** * Notifies the consumers during shutdown that it's on purpose. @@ -50,6 +55,7 @@ class AsyncABQ implements Synchrony { public AsyncABQ(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) { + this.errorHandler = errorHandler; this.dispatchQueue = new ArrayBlockingQueue(1024); @@ -87,53 +93,57 @@ class AsyncABQ implements Synchrony { @SuppressWarnings("Duplicates") private void process(final ArrayBlockingQueue queue, final Synchrony sync, final ErrorHandler errorHandler) { - MessageHolder event = null; + MessageHolder event; + int messageType = MessageType.ONE; - Subscription[] subscriptions; + Subscription[] subs; + Subscription[] superSubs; Object message1 = null; Object message2 = null; Object message3 = null; try { event = queue.take(); + messageType = event.type; - subscriptions = event.subscriptions; + subs = event.subs; + superSubs = event.superSubs; message1 = event.message1; message2 = event.message2; message3 = event.message3; switch (messageType) { case MessageType.ONE: { - sync.publish(subscriptions, message1); + sync.publish(subs, superSubs, message1); return; } case MessageType.TWO: { - sync.publish(subscriptions, message1, message2); + sync.publish(subs, superSubs, message1, message2); return; } case MessageType.THREE: { - sync.publish(subscriptions, message1, message2, message3); + sync.publish(subs, superSubs, message1, message2, message3); //noinspection UnnecessaryReturnStatement return; } } - } catch (Throwable e) { - if (event != null && !this.shuttingDown) { + } catch (InterruptedException e) { + if (!this.shuttingDown) { switch (messageType) { case MessageType.ONE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1)); return; } case MessageType.TWO: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1, message2)); return; } case MessageType.THREE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1, message2, message3)); //noinspection UnnecessaryReturnStatement @@ -146,41 +156,62 @@ class AsyncABQ implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { MessageHolder take = new MessageHolder(); take.type = MessageType.ONE; - take.subscriptions = subscriptions; + take.subs = subscriptions; + take.superSubs = superSubscriptions; take.message1 = message1; - this.dispatchQueue.put(take); + try { + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1)); + } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { MessageHolder take = new MessageHolder(); take.type = MessageType.TWO; - take.subscriptions = subscriptions; + take.subs = subscriptions; + take.superSubs = superSubscriptions; take.message1 = message1; take.message2 = message2; - this.dispatchQueue.put(take); + try { + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { MessageHolder take = new MessageHolder(); take.type = MessageType.THREE; - take.subscriptions = subscriptions; + take.subs = subscriptions; + take.superSubs = superSubscriptions; take.message1 = message1; take.message2 = message2; take.message3 = message3; - this.dispatchQueue.put(take); + try { + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } @Override diff --git a/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java b/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java index f63e3ce..90f808b 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java +++ b/src/dorkbox/messagebus/synchrony/AsyncABQ_noGc.java @@ -28,6 +28,10 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.locks.LockSupport; /** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + * * This is similar in behavior to the disruptor in that it does not generate garbage, however the downside of this implementation is it is * slow, but faster than other messagebus implementations. * @@ -45,6 +49,7 @@ class AsyncABQ_noGc implements Synchrony { private final Collection threads; private final Collection shutdown; + private final ErrorHandler errorHandler; /** * Notifies the consumers during shutdown that it's on purpose. @@ -54,6 +59,7 @@ class AsyncABQ_noGc implements Synchrony { public AsyncABQ_noGc(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) { + this.errorHandler = errorHandler; this.dispatchQueue = new ArrayBlockingQueue(1024); this.gcQueue = new ArrayBlockingQueue(1024); @@ -103,17 +109,21 @@ class AsyncABQ_noGc implements Synchrony { final Synchrony sync, final ErrorHandler errorHandler) { - MessageHolder event = null; + MessageHolder event; + int messageType = MessageType.ONE; - Subscription[] subscriptions; + Subscription[] subs; + Subscription[] superSubs; Object message1 = null; Object message2 = null; Object message3 = null; try { event = queue.take(); + messageType = event.type; - subscriptions = event.subscriptions; + subs = event.subs; + superSubs = event.superSubs; message1 = event.message1; message2 = event.message2; message3 = event.message3; @@ -122,36 +132,36 @@ class AsyncABQ_noGc implements Synchrony { switch (messageType) { case MessageType.ONE: { - sync.publish(subscriptions, message1); + sync.publish(subs, superSubs, message1); return; } case MessageType.TWO: { - sync.publish(subscriptions, message1, message2); + sync.publish(subs, superSubs, message1, message2); return; } case MessageType.THREE: { - sync.publish(subscriptions, message1, message2, message3); + sync.publish(subs, superSubs, message1, message2, message3); //noinspection UnnecessaryReturnStatement return; } } - } catch (Throwable e) { - if (event != null && !this.shuttingDown) { + } catch (InterruptedException e) { + if (!this.shuttingDown) { switch (messageType) { case MessageType.ONE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1)); return; } case MessageType.TWO: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1, message2)); return; } case MessageType.THREE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.") .setCause(e) .setPublishedObject(message1, message2, message3)); //noinspection UnnecessaryReturnStatement @@ -164,41 +174,61 @@ class AsyncABQ_noGc implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { - MessageHolder take = gcQueue.take(); + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { + try { + MessageHolder take = gcQueue.take(); - take.type = MessageType.ONE; - take.subscriptions = subscriptions; - take.message1 = message1; + take.type = MessageType.ONE; + take.subs = subscriptions; + take.message1 = message1; - this.dispatchQueue.put(take); + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1)); + } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { - MessageHolder take = gcQueue.take(); + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { + try { + MessageHolder take = gcQueue.take(); - take.type = MessageType.TWO; - take.subscriptions = subscriptions; - take.message1 = message1; - take.message2 = message2; + take.type = MessageType.TWO; + take.subs = subscriptions; + take.superSubs = superSubscriptions; + take.message1 = message1; + take.message2 = message2; - this.dispatchQueue.put(take); + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1, message2)); + } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { - MessageHolder take = gcQueue.take(); + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { + try { + MessageHolder take = gcQueue.take(); - take.type = MessageType.THREE; - take.subscriptions = subscriptions; - take.message1 = message1; - take.message2 = message2; - take.message3 = message3; + take.type = MessageType.THREE; + take.subs = subscriptions; + take.superSubs = superSubscriptions; + take.message1 = message1; + take.message2 = message2; + take.message3 = message3; - this.dispatchQueue.put(take); + this.dispatchQueue.put(take); + } catch (InterruptedException e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } @Override diff --git a/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java index 42936ac..b791e41 100644 --- a/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java +++ b/src/dorkbox/messagebus/synchrony/AsyncDisruptor.java @@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + * + * * @author dorkbox, llc Date: 2/3/16 */ public final @@ -62,7 +67,7 @@ class AsyncDisruptor implements Synchrony { // setup the work handlers handlers = new MessageHandler[numberOfThreads]; for (int i = 0; i < handlers.length; i++) { - handlers[i] = new MessageHandler(syncPublication, errorHandler); // exactly one per thread is used + handlers[i] = new MessageHandler(syncPublication); // exactly one per thread is used } @@ -121,12 +126,13 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); job.type = MessageType.ONE; - job.subscriptions = subscriptions; + job.subs = subscriptions; + job.superSubs = superSubscriptions; job.message1 = message1; ringBuffer.publish(seq); @@ -134,12 +140,13 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); job.type = MessageType.TWO; - job.subscriptions = subscriptions; + job.subs = subscriptions; + job.superSubs = superSubscriptions; job.message1 = message1; job.message2 = message2; @@ -148,12 +155,13 @@ class AsyncDisruptor implements Synchrony { @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { long seq = ringBuffer.next(); MessageHolder job = ringBuffer.get(seq); job.type = MessageType.THREE; - job.subscriptions = subscriptions; + job.subs = subscriptions; + job.superSubs = superSubscriptions; job.message1 = message1; job.message3 = message2; job.message2 = message3; diff --git a/src/dorkbox/messagebus/synchrony/MessageHolder.java b/src/dorkbox/messagebus/synchrony/MessageHolder.java index 209308b..450ecf4 100644 --- a/src/dorkbox/messagebus/synchrony/MessageHolder.java +++ b/src/dorkbox/messagebus/synchrony/MessageHolder.java @@ -24,7 +24,8 @@ import dorkbox.messagebus.synchrony.disruptor.MessageType; public class MessageHolder { public int type = MessageType.ONE; - public Subscription[] subscriptions; + public Subscription[] subs; + public Subscription[] superSubs; public Object message1 = null; public Object message2 = null; diff --git a/src/dorkbox/messagebus/synchrony/Sync.java b/src/dorkbox/messagebus/synchrony/Sync.java index d01d2b6..cc5b7d2 100644 --- a/src/dorkbox/messagebus/synchrony/Sync.java +++ b/src/dorkbox/messagebus/synchrony/Sync.java @@ -15,39 +15,154 @@ */ package dorkbox.messagebus.synchrony; +import dorkbox.messagebus.error.DeadMessage; +import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.subscription.Subscription; +import dorkbox.messagebus.subscription.SubscriptionManager; + /** + * By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses. + * + * The exception to this rule is when checking/calling DeadMessage publication. + * * @author dorkbox, llc Date: 2/2/15 */ +@SuppressWarnings("Duplicates") public final class Sync implements Synchrony { + + private final ErrorHandler errorHandler; + private final SubscriptionManager subManager; + public - void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { + Sync(final ErrorHandler errorHandler, final SubscriptionManager subManager) { + this.errorHandler = errorHandler; + this.subManager = subManager; + } + + public + void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) { + final ErrorHandler errorHandler = this.errorHandler; + Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message1); + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) + if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { + void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2) { + final ErrorHandler errorHandler = this.errorHandler; + Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message1, message2); + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) + if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } } } @Override public - void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { + void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) { + final ErrorHandler errorHandler = this.errorHandler; + Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - sub.publish(message1, message2, message3); + int subLength; + boolean hasSubs = false; + + // Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed. + if (subscriptions != null && (subLength = subscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = subscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + // the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL) + if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) { + // even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point. + // so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered + for (int i = 0; i < subLength; i++) { + sub = superSubscriptions[i]; + hasSubs |= sub.publish(errorHandler, message1, message2, message3); + } + } + + if (!hasSubs) { + // Dead Event must EXACTLY MATCH (no subclasses) + final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(errorHandler, deadMessage); + } + } } } diff --git a/src/dorkbox/messagebus/synchrony/Synchrony.java b/src/dorkbox/messagebus/synchrony/Synchrony.java index 81fa576..134d98f 100644 --- a/src/dorkbox/messagebus/synchrony/Synchrony.java +++ b/src/dorkbox/messagebus/synchrony/Synchrony.java @@ -22,9 +22,9 @@ import dorkbox.messagebus.subscription.Subscription; */ public interface Synchrony { - void publish(final Subscription[] subscriptions, Object message1) throws Throwable; - void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ; - void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ; + void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1); + void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2); + void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2, Object message3); void shutdown(); boolean hasPendingMessages(); diff --git a/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java b/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java index ec04162..9203ea3 100644 --- a/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java +++ b/src/dorkbox/messagebus/synchrony/disruptor/MessageHandler.java @@ -17,8 +17,6 @@ package dorkbox.messagebus.synchrony.disruptor; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.WorkHandler; -import dorkbox.messagebus.error.ErrorHandler; -import dorkbox.messagebus.error.PublicationError; import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.synchrony.MessageHolder; import dorkbox.messagebus.synchrony.Synchrony; @@ -32,14 +30,12 @@ public class MessageHandler implements WorkHandler, LifecycleAware { private final Synchrony syncPublication; - private final ErrorHandler errorHandler; private final AtomicBoolean shutdown = new AtomicBoolean(false); public - MessageHandler(final Synchrony syncPublication, final ErrorHandler errorHandler) { + MessageHandler(final Synchrony syncPublication) { this.syncPublication = syncPublication; - this.errorHandler = errorHandler; } @SuppressWarnings("Duplicates") @@ -47,47 +43,22 @@ class MessageHandler implements WorkHandler, LifecycleAware { public void onEvent(final MessageHolder event) throws Exception { final int messageType = event.type; - final Subscription[] subscriptions = event.subscriptions; + final Subscription[] subs = event.subs; + final Subscription[] superSubs = event.superSubs; - try { - switch (messageType) { - case MessageType.ONE: { - syncPublication.publish(subscriptions, event.message1); - return; - } - case MessageType.TWO: { - syncPublication.publish(subscriptions, event.message1, event.message2); - return; - } - case MessageType.THREE: { - syncPublication.publish(subscriptions, event.message1, event.message2, event.message3); - //noinspection UnnecessaryReturnStatement - return; - } + switch (messageType) { + case MessageType.ONE: { + syncPublication.publish(subs, superSubs, event.message1); + return; } - } catch (Throwable e) { - switch (messageType) { - case MessageType.ONE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(event.message1)); - return; - } - case MessageType.TWO: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(event.message1, event.message2)); - return; - } - case MessageType.THREE: { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(event.message1, - event.message2, - event.message3)); - //noinspection UnnecessaryReturnStatement - return; - } + case MessageType.TWO: { + syncPublication.publish(subs, superSubs, event.message1, event.message2); + return; + } + case MessageType.THREE: { + syncPublication.publish(subs, superSubs, event.message1, event.message2, event.message3); + //noinspection UnnecessaryReturnStatement + return; } } }