Fixed issues with dead message publication - we now detect if there are listeners are actually subscribed, and publish dead messages when there are none detected. Primary subscriptions/superSubscriptions are now 'gotten' by the calling thread (DeadMessage subscriptions are 'gotten' lazily. Error handling is now improved - a single error in a collection of "same type" subscriptions will not cause all of them to abort (only the currently called method will).

This commit is contained in:
nathan 2016-02-07 19:32:24 +01:00
parent 8747aec23f
commit 49933e9219
15 changed files with 482 additions and 357 deletions

View File

@ -156,7 +156,7 @@ class MessageBus implements IMessageBus {
switch (publishMode) { switch (publishMode) {
case Exact: case Exact:
publisher = new PublisherExact(errorHandler, subscriptionManager); publisher = new PublisherExact(subscriptionManager);
break; break;
case ExactWithSuperTypes: case ExactWithSuperTypes:
@ -165,7 +165,7 @@ class MessageBus implements IMessageBus {
break; break;
} }
syncPublication = new Sync(); syncPublication = new Sync(errorHandler, subscriptionManager);
// the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue // the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue
if (useDisruptorForAsyncPublish) { if (useDisruptorForAsyncPublish) {

View File

@ -15,109 +15,52 @@
*/ */
package dorkbox.messagebus.publication; package dorkbox.messagebus.publication;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.Synchrony; import dorkbox.messagebus.synchrony.Synchrony;
/**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*/
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public public
class PublisherExact implements Publisher { class PublisherExact implements Publisher {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager; private final SubscriptionManager subManager;
public public
PublisherExact(final ErrorHandler errorHandler, final SubscriptionManager subManager) { PublisherExact(final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager; this.subManager = subManager;
} }
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1) { void publish(final Synchrony synchrony, final Object message1) {
try { final Class<?> messageClass = message1.getClass();
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null
synchrony.publish(subscriptions, null, message1);
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1));
}
} }
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1, final Object message2) { void publish(final Synchrony synchrony, final Object message1, final Object message2) {
try { final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass1 = message1.getClass(); final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
synchrony.publish(subscriptions, null, message1, message2);
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1, message2);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
try { final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass1 = message1.getClass(); final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass2 = message2.getClass(); final Class<?> messageClass3 = message3.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
synchrony.publish(subscriptions, null, message1, message2, message3);
// Run subscriptions
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
synchrony.publish(subscriptions, message1, message2, message3);
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
} }

View File

@ -15,9 +15,7 @@
*/ */
package dorkbox.messagebus.publication; package dorkbox.messagebus.publication;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler; import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager; import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.Synchrony; import dorkbox.messagebus.synchrony.Synchrony;
@ -38,132 +36,37 @@ class PublisherExactWithSuperTypes implements Publisher {
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1) { void publish(final Synchrony synchrony, final Object message1) {
try { final SubscriptionManager subManager = this.subManager;
final SubscriptionManager subManager = this.subManager; final Class<?> messageClass1 = message1.getClass();
final Class<?> message1Class = message1.getClass();
boolean hasSubs = false;
final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null
// Run subscriptions synchrony.publish(subscriptions, superSubscriptions, message1);
final Subscription[] subscriptions = subManager.getSubs(message1Class); // can return null
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1);
}
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(message1Class); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1);
}
// Run dead message subscriptions
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1));
}
} }
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1, final Object message2) { void publish(final Synchrony synchrony, final Object message1, final Object message2) {
try { final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass(); final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass(); final Class<?> messageClass2 = message2.getClass();
boolean hasSubs = false;
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
// Run subscriptions final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null synchrony.publish(subscriptions, superSubscriptions, message1, message2);
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1, message2);
}
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1, message2);
}
// Run dead message subscriptions
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
@Override @Override
public public
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) { void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
try { final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass(); final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass(); final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass(); final Class<?> messageClass3 = message3.getClass();
boolean hasSubs = false;
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
// Run subscriptions final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null synchrony.publish(subscriptions, superSubscriptions, message1, message2, message3);
if (subscriptions != null) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(subscriptions, message1, message2, message3);
}
// Run superSubscriptions
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null
if (superSubscriptions.length > 0) {
// this can only tell if we have subscribed at some point -- but not if we currently have anything (because of the async
// nature of publication
hasSubs = true;
synchrony.publish(superSubscriptions, message1, message2, message3);
}
// Run dead message subscriptions
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1, message2, message3));
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
} }

View File

@ -17,6 +17,7 @@ package dorkbox.messagebus.subscription;
import com.esotericsoftware.kryo.util.IdentityMap; import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.messagebus.common.MessageHandler; import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.error.ErrorHandler;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -84,7 +85,7 @@ class Subscription<T> {
} }
public abstract public abstract
Entry<T> createEntry(final Object listener, final Entry head); Entry<T> createEntry(final Object listener, final Entry<T> head);
/** /**
* single writer principle! * single writer principle!
@ -151,13 +152,13 @@ class Subscription<T> {
} }
public abstract public abstract
void publish(final Object message) throws Throwable; boolean publish(final ErrorHandler errorHandler, final Object message);
public abstract public abstract
void publish(final Object message1, final Object message2) throws Throwable; boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2);
public abstract public abstract
void publish(final Object message1, final Object message2, final Object message3) throws Throwable; boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3);
@Override @Override

View File

@ -39,6 +39,8 @@ package dorkbox.messagebus.subscription.asm;
import com.esotericsoftware.reflectasm.MethodAccess; import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.messagebus.common.MessageHandler; import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
@ -87,58 +89,85 @@ class SubscriptionAsmStrong extends Subscription<Object> {
@Override @Override
public public
Entry createEntry(final Object listener, final Entry head) { Entry<Object> createEntry(final Object listener, final Entry<Object> head) {
return new Entry(listener, (Entry)head); return new Entry<Object>(listener, head);
} }
@Override @Override
public public
void publish(final Object message) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message); try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2) throws Throwable { boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2); try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable { boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2, final Object message3) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2, message3); try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
} }

View File

@ -39,6 +39,8 @@ package dorkbox.messagebus.subscription.asm;
import com.esotericsoftware.reflectasm.MethodAccess; import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.messagebus.common.MessageHandler; import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
@ -90,7 +92,7 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
@Override @Override
public public
Entry<WeakReference<Object>> createEntry(final Object listener, final Entry head) { Entry<WeakReference<Object>> createEntry(final Object listener, final Entry<WeakReference<Object>> head) {
return new Entry<WeakReference<Object>>(new WeakReference<Object>(listener), head); return new Entry<WeakReference<Object>>(new WeakReference<Object>(listener), head);
} }
@ -114,12 +116,13 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
@Override @Override
public public
void publish(final Object message) throws Throwable { boolean publish(final ErrorHandler errorHandler,final Object message) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -134,18 +137,28 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message); try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2) throws Throwable { boolean publish(final ErrorHandler errorHandler,final Object message1, final Object message2) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -160,18 +173,28 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2); try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) {
final MethodAccess handler = this.handlerAccess; final MethodAccess handler = this.handlerAccess;
final int handleIndex = this.methodIndex; final int handleIndex = this.methodIndex;
final AsmInvocation invocation = this.invocation; final AsmInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -186,8 +209,17 @@ class SubscriptionAsmWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, handler, handleIndex, message1, message2, message3); try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -38,6 +38,8 @@
package dorkbox.messagebus.subscription.reflection; package dorkbox.messagebus.subscription.reflection;
import dorkbox.messagebus.common.MessageHandler; import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
@ -80,55 +82,82 @@ class SubscriptionReflectionStrong extends Subscription<Object> {
@Override @Override
public public
Entry createEntry(final Object listener, final Entry head) { Entry<Object> createEntry(final Object listener, final Entry<Object> head) {
return new Entry(listener, (Entry)head); return new Entry<Object>(listener, head);
} }
@Override @Override
public public
void publish(final Object message) throws Throwable { boolean publish(final ErrorHandler errorHandler,final Object message) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, method, message); try {
invocation.invoke(listener, method, message);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, method, message1, message2); try {
invocation.invoke(listener, method, message1, message2);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry current = headREF.get(this); Entry head = headREF.get(this);
Entry current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
invocation.invoke(listener, method, message1, message2, message3); try {
invocation.invoke(listener, method, message1, message2, message3);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
return head != null; // true if we have something to publish to, otherwise false
} }
} }

View File

@ -38,6 +38,8 @@
package dorkbox.messagebus.subscription.reflection; package dorkbox.messagebus.subscription.reflection;
import dorkbox.messagebus.common.MessageHandler; import dorkbox.messagebus.common.MessageHandler;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Entry; import dorkbox.messagebus.subscription.Entry;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
@ -83,7 +85,7 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
@Override @Override
public public
Entry<WeakReference<Object>> createEntry(final Object listener, final Entry head) { Entry<WeakReference<Object>> createEntry(final Object listener, final Entry<WeakReference<Object>> head) {
return new Entry<WeakReference<Object>>(new WeakReference<Object>(listener), head); return new Entry<WeakReference<Object>>(new WeakReference<Object>(listener), head);
} }
@ -107,11 +109,12 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
@Override @Override
public public
void publish(final Object message) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -126,17 +129,27 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, method, message); try {
invocation.invoke(listener, method, message);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -151,17 +164,27 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, method, message1, message2); try {
invocation.invoke(listener, method, message1, message2);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@Override @Override
public public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable { boolean publish(final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3) {
final Method method = this.method; final Method method = this.method;
final ReflectionInvocation invocation = this.invocation; final ReflectionInvocation invocation = this.invocation;
Entry<WeakReference<Object>> current = cast(headREF.get(this)); Entry<WeakReference<Object>> head = cast(headREF.get(this));
Entry<WeakReference<Object>> current = head;
Object listener; Object listener;
while (current != null) { while (current != null) {
listener = current.getValue().get(); listener = current.getValue().get();
@ -176,8 +199,17 @@ class SubscriptionReflectionWeak extends Subscription<WeakReference<Object>> {
} }
current = current.next(); current = current.next();
invocation.invoke(listener, method, message1, message2, message3); try {
invocation.invoke(listener, method, message1, message2, message3);
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
// because the value can be GC'd at any time, this is the best guess possible
return head != null && head.getValue() != null; // true if we have something to publish to, otherwise false
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -28,6 +28,10 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
* This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it * This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it
* generates garbage (while the disruptor version does not). * generates garbage (while the disruptor version does not).
* *
@ -41,6 +45,7 @@ class AsyncABQ implements Synchrony {
private final ArrayBlockingQueue<MessageHolder> dispatchQueue; private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
private final Collection<Thread> threads; private final Collection<Thread> threads;
private final Collection<Boolean> shutdown; private final Collection<Boolean> shutdown;
private final ErrorHandler errorHandler;
/** /**
* Notifies the consumers during shutdown that it's on purpose. * Notifies the consumers during shutdown that it's on purpose.
@ -50,6 +55,7 @@ class AsyncABQ implements Synchrony {
public public
AsyncABQ(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) { AsyncABQ(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) {
this.errorHandler = errorHandler;
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024); this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
@ -87,53 +93,57 @@ class AsyncABQ implements Synchrony {
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
private private
void process(final ArrayBlockingQueue<MessageHolder> queue, final Synchrony sync, final ErrorHandler errorHandler) { void process(final ArrayBlockingQueue<MessageHolder> queue, final Synchrony sync, final ErrorHandler errorHandler) {
MessageHolder event = null; MessageHolder event;
int messageType = MessageType.ONE; int messageType = MessageType.ONE;
Subscription[] subscriptions; Subscription[] subs;
Subscription[] superSubs;
Object message1 = null; Object message1 = null;
Object message2 = null; Object message2 = null;
Object message3 = null; Object message3 = null;
try { try {
event = queue.take(); event = queue.take();
messageType = event.type; messageType = event.type;
subscriptions = event.subscriptions; subs = event.subs;
superSubs = event.superSubs;
message1 = event.message1; message1 = event.message1;
message2 = event.message2; message2 = event.message2;
message3 = event.message3; message3 = event.message3;
switch (messageType) { switch (messageType) {
case MessageType.ONE: { case MessageType.ONE: {
sync.publish(subscriptions, message1); sync.publish(subs, superSubs, message1);
return; return;
} }
case MessageType.TWO: { case MessageType.TWO: {
sync.publish(subscriptions, message1, message2); sync.publish(subs, superSubs, message1, message2);
return; return;
} }
case MessageType.THREE: { case MessageType.THREE: {
sync.publish(subscriptions, message1, message2, message3); sync.publish(subs, superSubs, message1, message2, message3);
//noinspection UnnecessaryReturnStatement //noinspection UnnecessaryReturnStatement
return; return;
} }
} }
} catch (Throwable e) { } catch (InterruptedException e) {
if (event != null && !this.shuttingDown) { if (!this.shuttingDown) {
switch (messageType) { switch (messageType) {
case MessageType.ONE: { case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1)); .setPublishedObject(message1));
return; return;
} }
case MessageType.TWO: { case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1, message2)); .setPublishedObject(message1, message2));
return; return;
} }
case MessageType.THREE: { case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1, message2, message3)); .setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement //noinspection UnnecessaryReturnStatement
@ -146,41 +156,62 @@ class AsyncABQ implements Synchrony {
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
MessageHolder take = new MessageHolder(); MessageHolder take = new MessageHolder();
take.type = MessageType.ONE; take.type = MessageType.ONE;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1; take.message1 = message1;
this.dispatchQueue.put(take); try {
this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1));
}
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
MessageHolder take = new MessageHolder(); MessageHolder take = new MessageHolder();
take.type = MessageType.TWO; take.type = MessageType.TWO;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1; take.message1 = message1;
take.message2 = message2; take.message2 = message2;
this.dispatchQueue.put(take); try {
this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
MessageHolder take = new MessageHolder(); MessageHolder take = new MessageHolder();
take.type = MessageType.THREE; take.type = MessageType.THREE;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1; take.message1 = message1;
take.message2 = message2; take.message2 = message2;
take.message3 = message3; take.message3 = message3;
this.dispatchQueue.put(take); try {
this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
@Override @Override

View File

@ -28,6 +28,10 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
* This is similar in behavior to the disruptor in that it does not generate garbage, however the downside of this implementation is it is * This is similar in behavior to the disruptor in that it does not generate garbage, however the downside of this implementation is it is
* slow, but faster than other messagebus implementations. * slow, but faster than other messagebus implementations.
* *
@ -45,6 +49,7 @@ class AsyncABQ_noGc implements Synchrony {
private final Collection<Thread> threads; private final Collection<Thread> threads;
private final Collection<Boolean> shutdown; private final Collection<Boolean> shutdown;
private final ErrorHandler errorHandler;
/** /**
* Notifies the consumers during shutdown that it's on purpose. * Notifies the consumers during shutdown that it's on purpose.
@ -54,6 +59,7 @@ class AsyncABQ_noGc implements Synchrony {
public public
AsyncABQ_noGc(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) { AsyncABQ_noGc(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) {
this.errorHandler = errorHandler;
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024); this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
this.gcQueue = new ArrayBlockingQueue<MessageHolder>(1024); this.gcQueue = new ArrayBlockingQueue<MessageHolder>(1024);
@ -103,17 +109,21 @@ class AsyncABQ_noGc implements Synchrony {
final Synchrony sync, final Synchrony sync,
final ErrorHandler errorHandler) { final ErrorHandler errorHandler) {
MessageHolder event = null; MessageHolder event;
int messageType = MessageType.ONE; int messageType = MessageType.ONE;
Subscription[] subscriptions; Subscription[] subs;
Subscription[] superSubs;
Object message1 = null; Object message1 = null;
Object message2 = null; Object message2 = null;
Object message3 = null; Object message3 = null;
try { try {
event = queue.take(); event = queue.take();
messageType = event.type; messageType = event.type;
subscriptions = event.subscriptions; subs = event.subs;
superSubs = event.superSubs;
message1 = event.message1; message1 = event.message1;
message2 = event.message2; message2 = event.message2;
message3 = event.message3; message3 = event.message3;
@ -122,36 +132,36 @@ class AsyncABQ_noGc implements Synchrony {
switch (messageType) { switch (messageType) {
case MessageType.ONE: { case MessageType.ONE: {
sync.publish(subscriptions, message1); sync.publish(subs, superSubs, message1);
return; return;
} }
case MessageType.TWO: { case MessageType.TWO: {
sync.publish(subscriptions, message1, message2); sync.publish(subs, superSubs, message1, message2);
return; return;
} }
case MessageType.THREE: { case MessageType.THREE: {
sync.publish(subscriptions, message1, message2, message3); sync.publish(subs, superSubs, message1, message2, message3);
//noinspection UnnecessaryReturnStatement //noinspection UnnecessaryReturnStatement
return; return;
} }
} }
} catch (Throwable e) { } catch (InterruptedException e) {
if (event != null && !this.shuttingDown) { if (!this.shuttingDown) {
switch (messageType) { switch (messageType) {
case MessageType.ONE: { case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1)); .setPublishedObject(message1));
return; return;
} }
case MessageType.TWO: { case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1, message2)); .setPublishedObject(message1, message2));
return; return;
} }
case MessageType.THREE: { case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message dequeue.")
.setCause(e) .setCause(e)
.setPublishedObject(message1, message2, message3)); .setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement //noinspection UnnecessaryReturnStatement
@ -164,41 +174,61 @@ class AsyncABQ_noGc implements Synchrony {
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
MessageHolder take = gcQueue.take(); try {
MessageHolder take = gcQueue.take();
take.type = MessageType.ONE; take.type = MessageType.ONE;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.message1 = message1; take.message1 = message1;
this.dispatchQueue.put(take); this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1));
}
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
MessageHolder take = gcQueue.take(); try {
MessageHolder take = gcQueue.take();
take.type = MessageType.TWO; take.type = MessageType.TWO;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.message1 = message1; take.superSubs = superSubscriptions;
take.message2 = message2; take.message1 = message1;
take.message2 = message2;
this.dispatchQueue.put(take); this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1, message2));
}
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
MessageHolder take = gcQueue.take(); try {
MessageHolder take = gcQueue.take();
take.type = MessageType.THREE; take.type = MessageType.THREE;
take.subscriptions = subscriptions; take.subs = subscriptions;
take.message1 = message1; take.superSubs = superSubscriptions;
take.message2 = message2; take.message1 = message1;
take.message3 = message3; take.message2 = message2;
take.message3 = message3;
this.dispatchQueue.put(take); this.dispatchQueue.put(take);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} }
@Override @Override

View File

@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
*
* @author dorkbox, llc Date: 2/3/16 * @author dorkbox, llc Date: 2/3/16
*/ */
public final public final
@ -62,7 +67,7 @@ class AsyncDisruptor implements Synchrony {
// setup the work handlers // setup the work handlers
handlers = new MessageHandler[numberOfThreads]; handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) { for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(syncPublication, errorHandler); // exactly one per thread is used handlers[i] = new MessageHandler(syncPublication); // exactly one per thread is used
} }
@ -121,12 +126,13 @@ class AsyncDisruptor implements Synchrony {
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
long seq = ringBuffer.next(); long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq); MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE; job.type = MessageType.ONE;
job.subscriptions = subscriptions; job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.message1 = message1; job.message1 = message1;
ringBuffer.publish(seq); ringBuffer.publish(seq);
@ -134,12 +140,13 @@ class AsyncDisruptor implements Synchrony {
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
long seq = ringBuffer.next(); long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq); MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.TWO; job.type = MessageType.TWO;
job.subscriptions = subscriptions; job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.message1 = message1; job.message1 = message1;
job.message2 = message2; job.message2 = message2;
@ -148,12 +155,13 @@ class AsyncDisruptor implements Synchrony {
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
long seq = ringBuffer.next(); long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq); MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.THREE; job.type = MessageType.THREE;
job.subscriptions = subscriptions; job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.message1 = message1; job.message1 = message1;
job.message3 = message2; job.message3 = message2;
job.message2 = message3; job.message2 = message3;

View File

@ -24,7 +24,8 @@ import dorkbox.messagebus.synchrony.disruptor.MessageType;
public public
class MessageHolder { class MessageHolder {
public int type = MessageType.ONE; public int type = MessageType.ONE;
public Subscription[] subscriptions; public Subscription[] subs;
public Subscription[] superSubs;
public Object message1 = null; public Object message1 = null;
public Object message2 = null; public Object message2 = null;

View File

@ -15,39 +15,154 @@
*/ */
package dorkbox.messagebus.synchrony; package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
/** /**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
*/ */
@SuppressWarnings("Duplicates")
public final public final
class Sync implements Synchrony { class Sync implements Synchrony {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager;
public public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { Sync(final ErrorHandler errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager;
}
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub; Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { int subLength;
sub = subscriptions[i]; boolean hasSubs = false;
sub.publish(message1);
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
} }
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable { void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub; Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { int subLength;
sub = subscriptions[i]; boolean hasSubs = false;
sub.publish(message1, message2);
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
} }
} }
@Override @Override
public public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable { void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub; Subscription sub;
for (int i = 0; i < subscriptions.length; i++) { int subLength;
sub = subscriptions[i]; boolean hasSubs = false;
sub.publish(message1, message2, message3);
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
} }
} }

View File

@ -22,9 +22,9 @@ import dorkbox.messagebus.subscription.Subscription;
*/ */
public public
interface Synchrony { interface Synchrony {
void publish(final Subscription[] subscriptions, Object message1) throws Throwable; void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1);
void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ; void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2);
void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ; void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2, Object message3);
void shutdown(); void shutdown();
boolean hasPendingMessages(); boolean hasPendingMessages();

View File

@ -17,8 +17,6 @@ package dorkbox.messagebus.synchrony.disruptor;
import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkHandler;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Subscription; import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.synchrony.MessageHolder; import dorkbox.messagebus.synchrony.MessageHolder;
import dorkbox.messagebus.synchrony.Synchrony; import dorkbox.messagebus.synchrony.Synchrony;
@ -32,14 +30,12 @@ public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware { class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final Synchrony syncPublication; private final Synchrony syncPublication;
private final ErrorHandler errorHandler;
private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shutdown = new AtomicBoolean(false);
public public
MessageHandler(final Synchrony syncPublication, final ErrorHandler errorHandler) { MessageHandler(final Synchrony syncPublication) {
this.syncPublication = syncPublication; this.syncPublication = syncPublication;
this.errorHandler = errorHandler;
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -47,47 +43,22 @@ class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
public public
void onEvent(final MessageHolder event) throws Exception { void onEvent(final MessageHolder event) throws Exception {
final int messageType = event.type; final int messageType = event.type;
final Subscription[] subscriptions = event.subscriptions; final Subscription[] subs = event.subs;
final Subscription[] superSubs = event.superSubs;
try { switch (messageType) {
switch (messageType) { case MessageType.ONE: {
case MessageType.ONE: { syncPublication.publish(subs, superSubs, event.message1);
syncPublication.publish(subscriptions, event.message1); return;
return;
}
case MessageType.TWO: {
syncPublication.publish(subscriptions, event.message1, event.message2);
return;
}
case MessageType.THREE: {
syncPublication.publish(subscriptions, event.message1, event.message2, event.message3);
//noinspection UnnecessaryReturnStatement
return;
}
} }
} catch (Throwable e) { case MessageType.TWO: {
switch (messageType) { syncPublication.publish(subs, superSubs, event.message1, event.message2);
case MessageType.ONE: { return;
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") }
.setCause(e) case MessageType.THREE: {
.setPublishedObject(event.message1)); syncPublication.publish(subs, superSubs, event.message1, event.message2, event.message3);
return; //noinspection UnnecessaryReturnStatement
} return;
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(event.message1, event.message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(event.message1,
event.message2,
event.message3));
//noinspection UnnecessaryReturnStatement
return;
}
} }
} }
} }