diff --git a/src/main/java/net/engio/mbassy/AbstractMessageBus.java b/src/main/java/net/engio/mbassy/AbstractMessageBus.java index bf17333..2a4500b 100644 --- a/src/main/java/net/engio/mbassy/AbstractMessageBus.java +++ b/src/main/java/net/engio/mbassy/AbstractMessageBus.java @@ -1,7 +1,7 @@ package net.engio.mbassy; import net.engio.mbassy.common.ReflectionUtils; -import net.engio.mbassy.dispatch.MessagingContext; +import net.engio.mbassy.dispatch.SubscriptionContext; import net.engio.mbassy.listener.MessageHandlerMetadata; import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.subscription.Subscription; @@ -122,7 +122,7 @@ public abstract class AbstractMessageBus for (MessageHandlerMetadata messageHandler : messageHandlers) { // create the subscription Subscription subscription = subscriptionFactory - .createSubscription(new MessagingContext(this, messageHandler)); + .createSubscription(new SubscriptionContext(this, messageHandler)); subscription.subscribe(listener); subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions) diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java index 78d359d..20ff8bd 100644 --- a/src/main/java/net/engio/mbassy/MBassador.java +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -27,9 +27,9 @@ public class MBassador extends AbstractMessageBus> if (subscriptions == null || subscriptions.isEmpty()) { // Dead Event subscriptions = getSubscriptionsByMessageType(DeadEvent.class); - return MessagePublication.Create(subscriptions, new DeadEvent(message)); + return MessagePublication.Create(this, subscriptions, new DeadEvent(message)); } - else return MessagePublication.Create(subscriptions, message); + else return MessagePublication.Create(this, subscriptions, message); } @@ -44,23 +44,6 @@ public class MBassador extends AbstractMessageBus> try { MessagePublication publication = createMessagePublication(message); publication.execute(); - - /* - final Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); - if (subscriptions == null || subscriptions.isEmpty()) { - // publish a DeadEvent since no subscriptions could be found - final Collection deadEventSubscriptions = getSubscriptionsByMessageType(DeadEvent.class); - if (deadEventSubscriptions != null && !deadEventSubscriptions.isEmpty()) { - for (Subscription subscription : deadEventSubscriptions) { - subscription.publish(new DeadEvent(message)); - } - } - } - else{ - for (Subscription subscription : subscriptions) { - subscription.publish(message); - } - }*/ } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") diff --git a/src/main/java/net/engio/mbassy/MessagePublication.java b/src/main/java/net/engio/mbassy/MessagePublication.java index 04e7381..7a3d60e 100644 --- a/src/main/java/net/engio/mbassy/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/MessagePublication.java @@ -1,6 +1,7 @@ package net.engio.mbassy; import net.engio.mbassy.common.DeadEvent; +import net.engio.mbassy.common.FilteredEvent; import net.engio.mbassy.subscription.Subscription; import java.util.Collection; @@ -10,13 +11,16 @@ import java.util.Collection; * of the corresponding message publication process, i.e. provides information whether the * publication was successfully scheduled, is currently running etc. * + * A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to + * be used in multiple threads simultaneously . + * * @author bennidi * Date: 11/16/12 */ public class MessagePublication { - public static MessagePublication Create(Collection subscriptions, Object message){ - return new MessagePublication(subscriptions, message, State.Initial); + public static MessagePublication Create(IMessageBus bus, Collection subscriptions, Object message){ + return new MessagePublication(bus,subscriptions, message, State.Initial); } private Collection subscriptions; @@ -25,7 +29,12 @@ public class MessagePublication { private State state = State.Scheduled; - private MessagePublication(Collection subscriptions, Object message, State initialState) { + private boolean delivered = false; + + private IMessageBus bus; + + private MessagePublication(IMessageBus bus, Collection subscriptions, Object message, State initialState) { + this.bus = bus; this.subscriptions = subscriptions; this.message = message; this.state = initialState; @@ -38,9 +47,12 @@ public class MessagePublication { protected void execute(){ state = State.Running; for(Subscription sub : subscriptions){ - sub.publish(message); + sub.publish(this, message); } state = State.Finished; + if(!delivered && !isFilteredEvent() && !isDeadEvent()){ + bus.post(new FilteredEvent(message)).now(); + } } public boolean isFinished() { @@ -55,6 +67,10 @@ public class MessagePublication { return state.equals(State.Scheduled); } + public void markDelivered(){ + delivered = true; + } + public MessagePublication markScheduled(){ if(!state.equals(State.Initial)) return this; @@ -71,6 +87,10 @@ public class MessagePublication { return DeadEvent.class.isAssignableFrom(message.getClass()); } + public boolean isFilteredEvent(){ + return FilteredEvent.class.isAssignableFrom(message.getClass()); + } + private enum State{ Initial,Scheduled,Running,Finished,Error; } diff --git a/src/main/java/net/engio/mbassy/common/DeadEvent.java b/src/main/java/net/engio/mbassy/common/DeadEvent.java index 38b9988..4878646 100644 --- a/src/main/java/net/engio/mbassy/common/DeadEvent.java +++ b/src/main/java/net/engio/mbassy/common/DeadEvent.java @@ -7,15 +7,10 @@ package net.engio.mbassy.common; * @author bennidi * Date: 1/18/13 */ -public class DeadEvent { +public class DeadEvent extends PublicationEvent { - private Object event; - - public DeadEvent(Object event) { - this.event = event; + public DeadEvent(Object message) { + super(message); } - public Object getEvent() { - return event; - } } diff --git a/src/main/java/net/engio/mbassy/common/FilteredEvent.java b/src/main/java/net/engio/mbassy/common/FilteredEvent.java new file mode 100644 index 0000000..98bdddb --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/FilteredEvent.java @@ -0,0 +1,17 @@ +package net.engio.mbassy.common; + +/** + * A filtered event is published when there have been matching subscriptions for a given + * message publication but configured filters prevented the message from being delivered to + * any of the handlers. + * + * @author bennidi + * Date: 3/1/13 + */ +public class FilteredEvent extends PublicationEvent { + + + public FilteredEvent(Object event) { + super(event); + } +} diff --git a/src/main/java/net/engio/mbassy/common/PublicationEvent.java b/src/main/java/net/engio/mbassy/common/PublicationEvent.java new file mode 100644 index 0000000..cefcfae --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/PublicationEvent.java @@ -0,0 +1,21 @@ +package net.engio.mbassy.common; + +/** + * A wrapped event is created when various conditions are matched (these depend on the concrete + * (sub)type of wrapped event). + * + * @author bennidi + * Date: 3/1/13 + */ +public abstract class PublicationEvent { + + private Object event; + + public PublicationEvent(Object message) { + this.event = message; + } + + public Object getEvent() { + return event; + } +} diff --git a/src/main/java/net/engio/mbassy/dispatch/AbstractHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/AbstractHandlerInvocation.java deleted file mode 100644 index 9a9bb20..0000000 --- a/src/main/java/net/engio/mbassy/dispatch/AbstractHandlerInvocation.java +++ /dev/null @@ -1,65 +0,0 @@ -package net.engio.mbassy.dispatch; - -import net.engio.mbassy.IPublicationErrorHandler; -import net.engio.mbassy.PublicationError; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Collection; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 11/23/12 - */ -public class AbstractHandlerInvocation { - - private MessagingContext context; - - protected void handlePublicationError(PublicationError error){ - Collection handlers = getContext().getOwningBus().getRegisteredErrorHandlers(); - for(IPublicationErrorHandler handler : handlers){ - handler.handleError(error); - } - } - - protected void invokeHandler(final Object message, final Object listener, Method handler){ - try { - handler.invoke(listener, message); - }catch(IllegalAccessException e){ - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "The class or method is not accessible", - handler, listener, message)); - } - catch(IllegalArgumentException e){ - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0], - handler, listener, message)); - } - catch (InvocationTargetException e) { - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Message handler threw exception", - handler, listener, message)); - } - catch (Throwable e) { - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Unexpected exception", - handler, listener, message)); - } - } - - - public AbstractHandlerInvocation(MessagingContext context) { - this.context = context; - } - - public MessagingContext getContext() { - return context; - } -} diff --git a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java index 7d85e56..31273e0 100644 --- a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java @@ -1,6 +1,7 @@ package net.engio.mbassy.dispatch; -import java.lang.reflect.Method; +import net.engio.mbassy.MessagePublication; +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; /** * This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously @@ -8,27 +9,22 @@ import java.lang.reflect.Method; * @author bennidi * Date: 11/23/12 */ -public class AsynchronousHandlerInvocation implements IHandlerInvocation { +public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { private IHandlerInvocation delegate; public AsynchronousHandlerInvocation(IHandlerInvocation delegate) { - super(); + super(delegate.getContext()); this.delegate = delegate; } @Override - public void invoke(final Method handler, final Object listener, final Object message) { + public void invoke(final Object listener, final Object message) { getContext().getOwningBus().getExecutor().execute(new Runnable() { @Override public void run() { - delegate.invoke(handler, listener, message); + delegate.invoke(listener, message); } }); } - - @Override - public MessagingContext getContext() { - return delegate.getContext(); - } } diff --git a/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java new file mode 100644 index 0000000..621c9bc --- /dev/null +++ b/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java @@ -0,0 +1,30 @@ +package net.engio.mbassy.dispatch; + +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; + +/** + * A delegating dispatcher wraps additional logic around a given delegate. Essentially its + * an implementation of the decorator pattern. + * + * @author bennidi + * Date: 3/1/13 + */ +public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher{ + + private IMessageDispatcher delegate; + + + public DelegatingMessageDispatcher(IMessageDispatcher delegate) { + super(delegate.getContext()); + this.delegate = delegate; + } + + protected IMessageDispatcher getDelegate() { + return delegate; + } + + @Override + public IHandlerInvocation getInvocation() { + return delegate.getInvocation(); + } +} diff --git a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java index d5933e8..8df3774 100644 --- a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java @@ -1,34 +1,27 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.MessagePublication; import net.engio.mbassy.common.ConcurrentSet; import net.engio.mbassy.subscription.MessageEnvelope; /** - * Todo: Add javadoc + * The enveloped dispatcher will wrap published messages in an envelope before + * passing them to their configured dispatcher. + * + * All enveloped message handlers will have this dispatcher in their chain * * @author bennidi * Date: 12/12/12 */ -public class EnvelopedMessageDispatcher implements IMessageDispatcher { +public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher{ - private IMessageDispatcher del; public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) { - this.del = dispatcher; + super(dispatcher); } @Override - public void dispatch(Object message, ConcurrentSet listeners) { - del.dispatch(new MessageEnvelope(message), listeners); - } - - @Override - public MessagingContext getContext() { - return del.getContext(); - } - - @Override - public IHandlerInvocation getInvocation() { - return del.getInvocation(); + public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { + getDelegate().dispatch(publication, new MessageEnvelope(message), listeners); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java index b2b361d..1bd1a3a 100644 --- a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.MessagePublication; import net.engio.mbassy.common.ConcurrentSet; import net.engio.mbassy.listener.IMessageFilter; @@ -11,14 +12,12 @@ import net.engio.mbassy.listener.IMessageFilter; * @author bennidi * Date: 11/23/12 */ -public class FilteredMessageDispatcher implements IMessageDispatcher { +public class FilteredMessageDispatcher extends DelegatingMessageDispatcher { private final IMessageFilter[] filter; - private IMessageDispatcher del; - public FilteredMessageDispatcher(IMessageDispatcher dispatcher) { - this.del = dispatcher; + super(dispatcher); this.filter = dispatcher.getContext().getHandlerMetadata().getFilter(); } @@ -37,21 +36,10 @@ public class FilteredMessageDispatcher implements IMessageDispatcher { @Override - public void dispatch(Object message, ConcurrentSet listeners) { + public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { if(passesFilter(message)){ - del.dispatch(message, listeners); + getDelegate().dispatch(publication, message, listeners); } } - @Override - public MessagingContext getContext() { - return del.getContext(); - } - - @Override - public IHandlerInvocation getInvocation() { - return del.getInvocation(); - } - - } diff --git a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java index 8dd042e..c449403 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import java.lang.reflect.Method; +import net.engio.mbassy.MessagePublication; /** * A handler invocation encapsulates the logic that is used to invoke a single @@ -11,21 +11,15 @@ import java.lang.reflect.Method; * @author bennidi * Date: 11/23/12 */ -public interface IHandlerInvocation { +public interface IHandlerInvocation extends ISubscriptionContextAware { /** - * Invoke the message delivery logic of this handler invocation + * Invoke the message delivery logic of this handler * - * @param handler The method that represents the actual message handler logic of the listener * @param listener The listener that will receive the message * @param message The message to be delivered to the listener */ - public void invoke(final Method handler, final Object listener, final Object message); + public void invoke(final Object listener, final Object message); - /** - * Get the messaging context associated with this invocation - * @return - */ - public MessagingContext getContext(); } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java b/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java new file mode 100644 index 0000000..9e3260e --- /dev/null +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java @@ -0,0 +1,14 @@ +package net.engio.mbassy.dispatch; + +import net.engio.mbassy.IMessageBus; + +/** + * This interface marks components that have access to the message bus that they belong to. + * + * @author bennidi + * Date: 3/1/13 + */ +public interface IMessageBusAware { + + public IMessageBus getBus(); +} diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java index a99b48a..41f5eb2 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.MessagePublication; import net.engio.mbassy.common.ConcurrentSet; /** @@ -11,30 +12,24 @@ import net.engio.mbassy.common.ConcurrentSet; * handler invocation object associated with the dispatcher. * * Implementations if IMessageDispatcher are partially designed using decorator pattern - * such that it is possible to compose different message dispatchers to achieve more complex - * dispatch logic. + * such that it is possible to compose different message dispatchers into dispatcher chains + * to achieve more complex dispatch logic. * * @author bennidi * Date: 11/23/12 */ -public interface IMessageDispatcher { +public interface IMessageDispatcher extends ISubscriptionContextAware { /** * Delivers the given message to the given set of listeners. * Delivery may be delayed, aborted or restricted in various ways, depending * on the configuration of the dispatcher * + * @param publication The message publication that initiated the dispatch * @param message The message that should be delivered to the listeners * @param listeners The listeners that should receive the message */ - public void dispatch(Object message, ConcurrentSet listeners); - - /** - * Get the messaging context associated with this dispatcher - * - * @return - */ - public MessagingContext getContext(); + public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners); /** * Get the handler invocation that will be used to deliver the message to each diff --git a/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java b/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java new file mode 100644 index 0000000..c16ae5b --- /dev/null +++ b/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java @@ -0,0 +1,17 @@ +package net.engio.mbassy.dispatch; + +/** + * This interface marks components that have access to the subscription context. + * + * @author bennidi + * Date: 3/1/13 + */ +public interface ISubscriptionContextAware extends IMessageBusAware { + + /** + * Get the subscription context associated with this object + * + * @return + */ + public SubscriptionContext getContext(); +} diff --git a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java index 25f77c4..6c69dc1 100644 --- a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java @@ -2,7 +2,10 @@ package net.engio.mbassy.dispatch; import java.lang.reflect.Method; +import net.engio.mbassy.IMessageBus; +import net.engio.mbassy.MessagePublication; import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; /** * Standard implementation for direct, unfiltered message delivery. @@ -14,31 +17,26 @@ import net.engio.mbassy.common.ConcurrentSet; * @author bennidi * Date: 11/23/12 */ -public class MessageDispatcher implements IMessageDispatcher { - - private MessagingContext context; +public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher { private IHandlerInvocation invocation; - public MessageDispatcher(MessagingContext context, IHandlerInvocation invocation) { - this.context = context; + public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) { + super(context); this.invocation = invocation; } @Override - public void dispatch(Object message, ConcurrentSet listeners) { - Method handler = getContext().getHandlerMetadata().getHandler(); + public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { + publication.markDelivered(); for(Object listener: listeners){ - getInvocation().invoke(handler, listener, message); + getInvocation().invoke(listener, message); } } - public MessagingContext getContext() { - return context; - } - @Override public IHandlerInvocation getInvocation() { return invocation; } + } diff --git a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java index 5ce812c..ee75da5 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -1,6 +1,13 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.IPublicationErrorHandler; +import net.engio.mbassy.MessagePublication; +import net.engio.mbassy.PublicationError; +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; + +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collection; /** * Uses reflection to invoke a message handler for a given message. @@ -8,14 +15,51 @@ import java.lang.reflect.Method; * @author bennidi * Date: 11/23/12 */ -public class ReflectiveHandlerInvocation extends AbstractHandlerInvocation implements IHandlerInvocation { +public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation{ - public ReflectiveHandlerInvocation(MessagingContext context) { + public ReflectiveHandlerInvocation(SubscriptionContext context) { super(context); } + protected void handlePublicationError(PublicationError error){ + Collection handlers = getContext().getOwningBus().getRegisteredErrorHandlers(); + for(IPublicationErrorHandler handler : handlers){ + handler.handleError(error); + } + } + + protected void invokeHandler(final Object message, final Object listener, Method handler){ + try { + handler.invoke(listener, message); + }catch(IllegalAccessException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "The class or method is not accessible", + handler, listener, message)); + } + catch(IllegalArgumentException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0], + handler, listener, message)); + } + catch (InvocationTargetException e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Message handler threw exception", + handler, listener, message)); + } + catch (Throwable e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Unexpected exception", + handler, listener, message)); + } + } + @Override - public void invoke(final Method handler, final Object listener, final Object message) { - invokeHandler(message, listener, handler); + public void invoke(final Object listener, final Object message) { + invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler()); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/MessagingContext.java b/src/main/java/net/engio/mbassy/dispatch/SubscriptionContext.java similarity index 73% rename from src/main/java/net/engio/mbassy/dispatch/MessagingContext.java rename to src/main/java/net/engio/mbassy/dispatch/SubscriptionContext.java index 5a6d0bb..6926bca 100644 --- a/src/main/java/net/engio/mbassy/dispatch/MessagingContext.java +++ b/src/main/java/net/engio/mbassy/dispatch/SubscriptionContext.java @@ -4,21 +4,21 @@ import net.engio.mbassy.IMessageBus; import net.engio.mbassy.listener.MessageHandlerMetadata; /** - * The messaging context holds all data/objects that is relevant to successfully publish + * The subscription context holds all (meta)data/objects that are relevant to successfully publish * a message within a subscription. A one-to-one relation between a subscription and - * MessagingContext holds -> a messaging context is created for each distinct subscription + * subscription context holds -> a subscription context is created for each distinct subscription * that lives inside a message bus. * * @author bennidi * Date: 11/23/12 */ -public class MessagingContext { +public class SubscriptionContext { private IMessageBus owningBus; private MessageHandlerMetadata handlerMetadata; - public MessagingContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) { + public SubscriptionContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) { this.owningBus = owningBus; this.handlerMetadata = handlerMetadata; } diff --git a/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java b/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java new file mode 100644 index 0000000..e2fbaf9 --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java @@ -0,0 +1,29 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.IMessageBus; +import net.engio.mbassy.dispatch.ISubscriptionContextAware; +import net.engio.mbassy.dispatch.SubscriptionContext; + +/** + * The base implementation for subscription context aware objects (mightily obvious :) + * + * @author bennidi + * Date: 3/1/13 + */ +public class AbstractSubscriptionContextAware implements ISubscriptionContextAware{ + + private SubscriptionContext context; + + public AbstractSubscriptionContextAware(SubscriptionContext context) { + this.context = context; + } + + public SubscriptionContext getContext() { + return context; + } + + @Override + public IMessageBus getBus() { + return context.getOwningBus(); + } +} diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index aca8132..7dc9e5d 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -3,9 +3,10 @@ package net.engio.mbassy.subscription; import java.util.Comparator; import java.util.UUID; +import net.engio.mbassy.MessagePublication; import net.engio.mbassy.common.ConcurrentSet; import net.engio.mbassy.dispatch.IMessageDispatcher; -import net.engio.mbassy.dispatch.MessagingContext; +import net.engio.mbassy.dispatch.SubscriptionContext; /** * A subscription is a thread safe container for objects that contain message handlers @@ -18,9 +19,9 @@ public class Subscription { private IMessageDispatcher dispatcher; - private MessagingContext context; + private SubscriptionContext context; - public Subscription(MessagingContext context, IMessageDispatcher dispatcher) { + public Subscription(SubscriptionContext context, IMessageDispatcher dispatcher) { this.context = context; this.dispatcher = dispatcher; } @@ -31,12 +32,8 @@ public class Subscription { } - public void publish(Object message){ - dispatcher.dispatch(message, listeners); - } - - public MessagingContext getContext(){ - return context; + public void publish(MessagePublication publication, Object message){ + dispatcher.dispatch(publication, message, listeners); } public int getPriority(){ diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java index 2915bff..f0e8f53 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java @@ -1,13 +1,7 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.dispatch.AsynchronousHandlerInvocation; -import net.engio.mbassy.dispatch.EnvelopedMessageDispatcher; -import net.engio.mbassy.dispatch.FilteredMessageDispatcher; -import net.engio.mbassy.dispatch.IHandlerInvocation; -import net.engio.mbassy.dispatch.IMessageDispatcher; -import net.engio.mbassy.dispatch.MessageDispatcher; -import net.engio.mbassy.dispatch.MessagingContext; -import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; +import net.engio.mbassy.dispatch.*; +import net.engio.mbassy.dispatch.SubscriptionContext; /** * Created with IntelliJ IDEA. @@ -18,13 +12,13 @@ import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; */ public class SubscriptionFactory { - public Subscription createSubscription(MessagingContext context){ + public Subscription createSubscription(SubscriptionContext context){ IHandlerInvocation invocation = buildInvocationForHandler(context); IMessageDispatcher dispatcher = buildDispatcher(context, invocation); return new Subscription(context, dispatcher); } - protected IHandlerInvocation buildInvocationForHandler(MessagingContext context){ + protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context){ IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context); if(context.getHandlerMetadata().isAsynchronous()){ invocation = new AsynchronousHandlerInvocation(invocation); @@ -32,7 +26,7 @@ public class SubscriptionFactory { return invocation; } - protected IMessageDispatcher buildDispatcher(MessagingContext context, IHandlerInvocation invocation){ + protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation){ IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation); if(context.getHandlerMetadata().isEnveloped()){ dispatcher = new EnvelopedMessageDispatcher(dispatcher); diff --git a/src/test/java/net/engio/mbassy/DeadEventTest.java b/src/test/java/net/engio/mbassy/DeadEventTest.java index a4a7bf9..b509a59 100644 --- a/src/test/java/net/engio/mbassy/DeadEventTest.java +++ b/src/test/java/net/engio/mbassy/DeadEventTest.java @@ -7,7 +7,7 @@ import net.engio.mbassy.listener.Listener; import org.junit.Test; /** - * Verify correct behaviour in case of empty message publications + * Verify correct behaviour in case of message publications that do not have any matching subscriptions * * @author bennidi * Date: 1/18/13