added FilteredEvent, interface optimization

This commit is contained in:
benni 2013-03-01 22:59:28 +01:00
parent 676f729e08
commit eb50dbc96e
22 changed files with 263 additions and 203 deletions

View File

@ -1,7 +1,7 @@
package net.engio.mbassy;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.dispatch.MessagingContext;
import net.engio.mbassy.dispatch.SubscriptionContext;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.Subscription;
@ -122,7 +122,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
for (MessageHandlerMetadata messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = subscriptionFactory
.createSubscription(new MessagingContext(this, messageHandler));
.createSubscription(new SubscriptionContext(this, messageHandler));
subscription.subscribe(listener);
subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions)

View File

@ -27,9 +27,9 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadEvent.class);
return MessagePublication.Create(subscriptions, new DeadEvent(message));
return MessagePublication.Create(this, subscriptions, new DeadEvent(message));
}
else return MessagePublication.Create(subscriptions, message);
else return MessagePublication.Create(this, subscriptions, message);
}
@ -44,23 +44,6 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
try {
MessagePublication publication = createMessagePublication(message);
publication.execute();
/*
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null || subscriptions.isEmpty()) {
// publish a DeadEvent since no subscriptions could be found
final Collection<Subscription> deadEventSubscriptions = getSubscriptionsByMessageType(DeadEvent.class);
if (deadEventSubscriptions != null && !deadEventSubscriptions.isEmpty()) {
for (Subscription subscription : deadEventSubscriptions) {
subscription.publish(new DeadEvent(message));
}
}
}
else{
for (Subscription subscription : subscriptions) {
subscription.publish(message);
}
}*/
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")

View File

@ -1,6 +1,7 @@
package net.engio.mbassy;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.common.FilteredEvent;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -10,13 +11,16 @@ import java.util.Collection;
* of the corresponding message publication process, i.e. provides information whether the
* publication was successfully scheduled, is currently running etc.
*
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
* be used in multiple threads simultaneously .
*
* @author bennidi
* Date: 11/16/12
*/
public class MessagePublication {
public static MessagePublication Create(Collection<Subscription> subscriptions, Object message){
return new MessagePublication(subscriptions, message, State.Initial);
public static MessagePublication Create(IMessageBus bus, Collection<Subscription> subscriptions, Object message){
return new MessagePublication(bus,subscriptions, message, State.Initial);
}
private Collection<Subscription> subscriptions;
@ -25,7 +29,12 @@ public class MessagePublication {
private State state = State.Scheduled;
private MessagePublication(Collection<Subscription> subscriptions, Object message, State initialState) {
private boolean delivered = false;
private IMessageBus bus;
private MessagePublication(IMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
this.bus = bus;
this.subscriptions = subscriptions;
this.message = message;
this.state = initialState;
@ -38,9 +47,12 @@ public class MessagePublication {
protected void execute(){
state = State.Running;
for(Subscription sub : subscriptions){
sub.publish(message);
sub.publish(this, message);
}
state = State.Finished;
if(!delivered && !isFilteredEvent() && !isDeadEvent()){
bus.post(new FilteredEvent(message)).now();
}
}
public boolean isFinished() {
@ -55,6 +67,10 @@ public class MessagePublication {
return state.equals(State.Scheduled);
}
public void markDelivered(){
delivered = true;
}
public MessagePublication markScheduled(){
if(!state.equals(State.Initial))
return this;
@ -71,6 +87,10 @@ public class MessagePublication {
return DeadEvent.class.isAssignableFrom(message.getClass());
}
public boolean isFilteredEvent(){
return FilteredEvent.class.isAssignableFrom(message.getClass());
}
private enum State{
Initial,Scheduled,Running,Finished,Error;
}

View File

@ -7,15 +7,10 @@ package net.engio.mbassy.common;
* @author bennidi
* Date: 1/18/13
*/
public class DeadEvent {
public class DeadEvent extends PublicationEvent {
private Object event;
public DeadEvent(Object event) {
this.event = event;
public DeadEvent(Object message) {
super(message);
}
public Object getEvent() {
return event;
}
}

View File

@ -0,0 +1,17 @@
package net.engio.mbassy.common;
/**
* A filtered event is published when there have been matching subscriptions for a given
* message publication but configured filters prevented the message from being delivered to
* any of the handlers.
*
* @author bennidi
* Date: 3/1/13
*/
public class FilteredEvent extends PublicationEvent {
public FilteredEvent(Object event) {
super(event);
}
}

View File

@ -0,0 +1,21 @@
package net.engio.mbassy.common;
/**
* A wrapped event is created when various conditions are matched (these depend on the concrete
* (sub)type of wrapped event).
*
* @author bennidi
* Date: 3/1/13
*/
public abstract class PublicationEvent {
private Object event;
public PublicationEvent(Object message) {
this.event = message;
}
public Object getEvent() {
return event;
}
}

View File

@ -1,65 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 11/23/12
*/
public class AbstractHandlerInvocation {
private MessagingContext context;
protected void handlePublicationError(PublicationError error){
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
for(IPublicationErrorHandler handler : handlers){
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener, Method handler){
try {
handler.invoke(listener, message);
}catch(IllegalAccessException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
handler, listener, message));
}
catch(IllegalArgumentException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
}
catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
handler, listener, message));
}
catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handler, listener, message));
}
}
public AbstractHandlerInvocation(MessagingContext context) {
this.context = context;
}
public MessagingContext getContext() {
return context;
}
}

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.dispatch;
import java.lang.reflect.Method;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
* This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously
@ -8,27 +9,22 @@ import java.lang.reflect.Method;
* @author bennidi
* Date: 11/23/12
*/
public class AsynchronousHandlerInvocation implements IHandlerInvocation {
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
private IHandlerInvocation delegate;
public AsynchronousHandlerInvocation(IHandlerInvocation delegate) {
super();
super(delegate.getContext());
this.delegate = delegate;
}
@Override
public void invoke(final Method handler, final Object listener, final Object message) {
public void invoke(final Object listener, final Object message) {
getContext().getOwningBus().getExecutor().execute(new Runnable() {
@Override
public void run() {
delegate.invoke(handler, listener, message);
delegate.invoke(listener, message);
}
});
}
@Override
public MessagingContext getContext() {
return delegate.getContext();
}
}

View File

@ -0,0 +1,30 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
* A delegating dispatcher wraps additional logic around a given delegate. Essentially its
* an implementation of the decorator pattern.
*
* @author bennidi
* Date: 3/1/13
*/
public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher{
private IMessageDispatcher delegate;
public DelegatingMessageDispatcher(IMessageDispatcher delegate) {
super(delegate.getContext());
this.delegate = delegate;
}
protected IMessageDispatcher getDelegate() {
return delegate;
}
@Override
public IHandlerInvocation getInvocation() {
return delegate.getInvocation();
}
}

View File

@ -1,34 +1,27 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.subscription.MessageEnvelope;
/**
* Todo: Add javadoc
* The enveloped dispatcher will wrap published messages in an envelope before
* passing them to their configured dispatcher.
*
* All enveloped message handlers will have this dispatcher in their chain
*
* @author bennidi
* Date: 12/12/12
*/
public class EnvelopedMessageDispatcher implements IMessageDispatcher {
public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher{
private IMessageDispatcher del;
public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) {
this.del = dispatcher;
super(dispatcher);
}
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
del.dispatch(new MessageEnvelope(message), listeners);
}
@Override
public MessagingContext getContext() {
return del.getContext();
}
@Override
public IHandlerInvocation getInvocation() {
return del.getInvocation();
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
getDelegate().dispatch(publication, new MessageEnvelope(message), listeners);
}
}

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.listener.IMessageFilter;
@ -11,14 +12,12 @@ import net.engio.mbassy.listener.IMessageFilter;
* @author bennidi
* Date: 11/23/12
*/
public class FilteredMessageDispatcher implements IMessageDispatcher {
public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
private final IMessageFilter[] filter;
private IMessageDispatcher del;
public FilteredMessageDispatcher(IMessageDispatcher dispatcher) {
this.del = dispatcher;
super(dispatcher);
this.filter = dispatcher.getContext().getHandlerMetadata().getFilter();
}
@ -37,21 +36,10 @@ public class FilteredMessageDispatcher implements IMessageDispatcher {
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
if(passesFilter(message)){
del.dispatch(message, listeners);
getDelegate().dispatch(publication, message, listeners);
}
}
@Override
public MessagingContext getContext() {
return del.getContext();
}
@Override
public IHandlerInvocation getInvocation() {
return del.getInvocation();
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import java.lang.reflect.Method;
import net.engio.mbassy.MessagePublication;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
@ -11,21 +11,15 @@ import java.lang.reflect.Method;
* @author bennidi
* Date: 11/23/12
*/
public interface IHandlerInvocation {
public interface IHandlerInvocation extends ISubscriptionContextAware {
/**
* Invoke the message delivery logic of this handler invocation
* Invoke the message delivery logic of this handler
*
* @param handler The method that represents the actual message handler logic of the listener
* @param listener The listener that will receive the message
* @param message The message to be delivered to the listener
*/
public void invoke(final Method handler, final Object listener, final Object message);
public void invoke(final Object listener, final Object message);
/**
* Get the messaging context associated with this invocation
* @return
*/
public MessagingContext getContext();
}

View File

@ -0,0 +1,14 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IMessageBus;
/**
* This interface marks components that have access to the message bus that they belong to.
*
* @author bennidi
* Date: 3/1/13
*/
public interface IMessageBusAware {
public IMessageBus getBus();
}

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
/**
@ -11,30 +12,24 @@ import net.engio.mbassy.common.ConcurrentSet;
* handler invocation object associated with the dispatcher.
*
* Implementations if IMessageDispatcher are partially designed using decorator pattern
* such that it is possible to compose different message dispatchers to achieve more complex
* dispatch logic.
* such that it is possible to compose different message dispatchers into dispatcher chains
* to achieve more complex dispatch logic.
*
* @author bennidi
* Date: 11/23/12
*/
public interface IMessageDispatcher {
public interface IMessageDispatcher extends ISubscriptionContextAware {
/**
* Delivers the given message to the given set of listeners.
* Delivery may be delayed, aborted or restricted in various ways, depending
* on the configuration of the dispatcher
*
* @param publication The message publication that initiated the dispatch
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
public void dispatch(Object message, ConcurrentSet listeners);
/**
* Get the messaging context associated with this dispatcher
*
* @return
*/
public MessagingContext getContext();
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners);
/**
* Get the handler invocation that will be used to deliver the message to each

View File

@ -0,0 +1,17 @@
package net.engio.mbassy.dispatch;
/**
* This interface marks components that have access to the subscription context.
*
* @author bennidi
* Date: 3/1/13
*/
public interface ISubscriptionContextAware extends IMessageBusAware {
/**
* Get the subscription context associated with this object
*
* @return
*/
public SubscriptionContext getContext();
}

View File

@ -2,7 +2,10 @@ package net.engio.mbassy.dispatch;
import java.lang.reflect.Method;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
* Standard implementation for direct, unfiltered message delivery.
@ -14,31 +17,26 @@ import net.engio.mbassy.common.ConcurrentSet;
* @author bennidi
* Date: 11/23/12
*/
public class MessageDispatcher implements IMessageDispatcher {
private MessagingContext context;
public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
private IHandlerInvocation invocation;
public MessageDispatcher(MessagingContext context, IHandlerInvocation invocation) {
this.context = context;
public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
super(context);
this.invocation = invocation;
}
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
Method handler = getContext().getHandlerMetadata().getHandler();
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
publication.markDelivered();
for(Object listener: listeners){
getInvocation().invoke(handler, listener, message);
getInvocation().invoke(listener, message);
}
}
public MessagingContext getContext() {
return context;
}
@Override
public IHandlerInvocation getInvocation() {
return invocation;
}
}

View File

@ -1,6 +1,13 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Uses reflection to invoke a message handler for a given message.
@ -8,14 +15,51 @@ import java.lang.reflect.Method;
* @author bennidi
* Date: 11/23/12
*/
public class ReflectiveHandlerInvocation extends AbstractHandlerInvocation implements IHandlerInvocation {
public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation{
public ReflectiveHandlerInvocation(MessagingContext context) {
public ReflectiveHandlerInvocation(SubscriptionContext context) {
super(context);
}
protected void handlePublicationError(PublicationError error){
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
for(IPublicationErrorHandler handler : handlers){
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener, Method handler){
try {
handler.invoke(listener, message);
}catch(IllegalAccessException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
handler, listener, message));
}
catch(IllegalArgumentException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
}
catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
handler, listener, message));
}
catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handler, listener, message));
}
}
@Override
public void invoke(final Method handler, final Object listener, final Object message) {
invokeHandler(message, listener, handler);
public void invoke(final Object listener, final Object message) {
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());
}
}

View File

@ -4,21 +4,21 @@ import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.listener.MessageHandlerMetadata;
/**
* The messaging context holds all data/objects that is relevant to successfully publish
* The subscription context holds all (meta)data/objects that are relevant to successfully publish
* a message within a subscription. A one-to-one relation between a subscription and
* MessagingContext holds -> a messaging context is created for each distinct subscription
* subscription context holds -> a subscription context is created for each distinct subscription
* that lives inside a message bus.
*
* @author bennidi
* Date: 11/23/12
*/
public class MessagingContext {
public class SubscriptionContext {
private IMessageBus owningBus;
private MessageHandlerMetadata handlerMetadata;
public MessagingContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) {
public SubscriptionContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) {
this.owningBus = owningBus;
this.handlerMetadata = handlerMetadata;
}

View File

@ -0,0 +1,29 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.dispatch.ISubscriptionContextAware;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* The base implementation for subscription context aware objects (mightily obvious :)
*
* @author bennidi
* Date: 3/1/13
*/
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware{
private SubscriptionContext context;
public AbstractSubscriptionContextAware(SubscriptionContext context) {
this.context = context;
}
public SubscriptionContext getContext() {
return context;
}
@Override
public IMessageBus getBus() {
return context.getOwningBus();
}
}

View File

@ -3,9 +3,10 @@ package net.engio.mbassy.subscription;
import java.util.Comparator;
import java.util.UUID;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.MessagingContext;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* A subscription is a thread safe container for objects that contain message handlers
@ -18,9 +19,9 @@ public class Subscription {
private IMessageDispatcher dispatcher;
private MessagingContext context;
private SubscriptionContext context;
public Subscription(MessagingContext context, IMessageDispatcher dispatcher) {
public Subscription(SubscriptionContext context, IMessageDispatcher dispatcher) {
this.context = context;
this.dispatcher = dispatcher;
}
@ -31,12 +32,8 @@ public class Subscription {
}
public void publish(Object message){
dispatcher.dispatch(message, listeners);
}
public MessagingContext getContext(){
return context;
public void publish(MessagePublication publication, Object message){
dispatcher.dispatch(publication, message, listeners);
}
public int getPriority(){

View File

@ -1,13 +1,7 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.dispatch.AsynchronousHandlerInvocation;
import net.engio.mbassy.dispatch.EnvelopedMessageDispatcher;
import net.engio.mbassy.dispatch.FilteredMessageDispatcher;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.MessageDispatcher;
import net.engio.mbassy.dispatch.MessagingContext;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* Created with IntelliJ IDEA.
@ -18,13 +12,13 @@ import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
*/
public class SubscriptionFactory {
public Subscription createSubscription(MessagingContext context){
public Subscription createSubscription(SubscriptionContext context){
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher);
}
protected IHandlerInvocation buildInvocationForHandler(MessagingContext context){
protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context){
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context);
if(context.getHandlerMetadata().isAsynchronous()){
invocation = new AsynchronousHandlerInvocation(invocation);
@ -32,7 +26,7 @@ public class SubscriptionFactory {
return invocation;
}
protected IMessageDispatcher buildDispatcher(MessagingContext context, IHandlerInvocation invocation){
protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation){
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
if(context.getHandlerMetadata().isEnveloped()){
dispatcher = new EnvelopedMessageDispatcher(dispatcher);

View File

@ -7,7 +7,7 @@ import net.engio.mbassy.listener.Listener;
import org.junit.Test;
/**
* Verify correct behaviour in case of empty message publications
* Verify correct behaviour in case of message publications that do not have any matching subscriptions
*
* @author bennidi
* Date: 1/18/13