diff --git a/src/main/java/org/mbassy/AbstractMessageBus.java b/src/main/java/org/mbassy/AbstractMessageBus.java index 05fbcea..40de146 100644 --- a/src/main/java/org/mbassy/AbstractMessageBus.java +++ b/src/main/java/org/mbassy/AbstractMessageBus.java @@ -48,6 +48,8 @@ public abstract class AbstractMessageBus // all pending messages scheduled for asynchronous dispatch are queued here private final LinkedBlockingQueue> pendingMessages = new LinkedBlockingQueue>(); + // this factory is used to create specialized subscriptions based on the given message handler configuration + // it can be customized by implementing the getSubscriptionFactory() method private final SubscriptionFactory subscriptionFactory; // initialize the dispatch workers @@ -72,6 +74,7 @@ public abstract class AbstractMessageBus } } + public AbstractMessageBus() { this(2); } @@ -102,9 +105,9 @@ public abstract class AbstractMessageBus if (listener == null) return false; Collection subscriptions = subscriptionsPerListener.get(listener.getClass()); if (subscriptions == null) return false; - boolean isRemoved = false; + boolean isRemoved = true; for (Subscription subscription : subscriptions) { - isRemoved = isRemoved || subscription.unsubscribe(listener); + isRemoved = isRemoved && subscription.unsubscribe(listener); } return isRemoved; } @@ -165,24 +168,15 @@ public abstract class AbstractMessageBus if (subscriptionsPerMessage.get(messageType) != null) { subscriptions.addAll(subscriptionsPerMessage.get(messageType)); } - for (Class eventSuperType : getSuperclasses(messageType)) { + for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) { if (subscriptionsPerMessage.get(eventSuperType) != null) { subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType)); } } - // IMPROVEMENT: use tree list that sorts during insertion - //Collections.sort(subscriptions, new SubscriptionByPriorityDesc()); return subscriptions; } - private Collection getSuperclasses(Class from) { - Collection superclasses = new LinkedList(); - while (!from.equals(Object.class)) { - superclasses.add(from.getSuperclass()); - from = from.getSuperclass(); - } - return superclasses; - } + // associate a suscription with a message type private void addMessageTypeSubscription(Class messageType, Subscription subscription) { diff --git a/src/main/java/org/mbassy/IMessageBus.java b/src/main/java/org/mbassy/IMessageBus.java index 07c1cda..9d86ef5 100644 --- a/src/main/java/org/mbassy/IMessageBus.java +++ b/src/main/java/org/mbassy/IMessageBus.java @@ -7,8 +7,8 @@ import java.util.concurrent.Executor; * * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T. - * The dispatch mechanism can by controlled for each concrete message publication. - * A message publication is the publication of any message using one of the bus' publish(..) methods. + * The dispatch mechanism can by controlled for per message handler and message publication. + * A message publication is the publication of any message using one of the bus' publication methods. *

* Each message publication is isolated from all other running publications such that it does not interfere with them. * Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple @@ -22,19 +22,22 @@ import java.util.concurrent.Executor; * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are * removed on-the-fly as messages get dispatched. *

- * Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any - * class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver + * Generally message handlers will be invoked in inverse sequence of subscription but any + * client using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver * a specific message exactly once to each of the subscribed message handlers. *

* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally * a message handler may define filters to narrow the set of messages that it accepts. *

* Subscribed message handlers are available to all pending message publications that have not yet started processing. - * Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored) + * Any message listener may only be subscribed once -> subsequent subscriptions of an already subscribed message listener + * will be silently ignored) *

- * Removing a listener means removing all subscribed message handlers of that object. This remove operation - * immediately takes effect and on all running dispatch processes. A removed listener (a listener + * Removing a listener (unsubscribing) means removing all subscribed message handlers of that listener. This remove operation + * immediately takes effect and on all running dispatch processes -> A removed listener (a listener * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications. + * Any running message publication that has not yet delivered the message to the removed listener will not see the listener + * after the remove operation completed. * * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will * get dispatched to all message handlers that take an instance of List as their parameter @@ -55,11 +58,12 @@ public interface IMessageBus { /** - * Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns + * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers * have effectively been removed and will not receive any message publications (including asynchronously scheduled - * publications that have been published when the messageHandler was still subscribed). - * A call to this method passing null, an already subscribed message or any message that does not define any listeners - * will not have any effect. + * publications that have been published when the message listener was still subscribed). + * + * A call to this method passing null, an already unsubscribed listener or any object that does not define any message + * handlers will not have any effect and is silently ignored. * * @param listener * @return true, if the listener was found and successfully removed @@ -91,14 +95,34 @@ public interface IMessageBus { */ public Collection getRegisteredErrorHandlers(); + + /** + * Get the executor service that is used to asynchronous message publication. + * The executor is passed to the message bus at creation time. + * + * @return + */ public Executor getExecutor(); - + /** + * A post command is used as an intermediate object created by a call to the message bus' post method. + * It encapsulates the functionality provided by the message bus that created the command. + * Subclasses may extend this interface and add functionality, e.g. different dispatch schemes. + * + */ public static interface IPostCommand{ + /** + * Execute the message publication immediately. This call blocks until every matching message handler + * has been invoked. + */ public void now(); + /** + * Execute the message publication asynchronously. This call return immediately and all matching message handlers + * will be invoked in another thread. + */ public void asynchronously(); } diff --git a/src/main/java/org/mbassy/IPublicationErrorHandler.java b/src/main/java/org/mbassy/IPublicationErrorHandler.java index 3cfa94b..e173037 100644 --- a/src/main/java/org/mbassy/IPublicationErrorHandler.java +++ b/src/main/java/org/mbassy/IPublicationErrorHandler.java @@ -1,13 +1,21 @@ package org.mbassy; /** - * TODO. Insert class description here + * Publication error handlers are provided with a publication error every time an error occurs during message publication. + * A handler might fail with an exception, not be accessible because of the presence of a security manager + * or other reasons might lead to failures during the message publication process. + * *

* @author bennidi * Date: 2/22/12 */ public interface IPublicationErrorHandler { + /** + * Handle the given publication error. + * + * @param error + */ public void handleError(PublicationError error); // This is the default error handler it will simply log to standard out and diff --git a/src/main/java/org/mbassy/MBassador.java b/src/main/java/org/mbassy/MBassador.java index c9684ff..e6d96d1 100644 --- a/src/main/java/org/mbassy/MBassador.java +++ b/src/main/java/org/mbassy/MBassador.java @@ -6,7 +6,7 @@ import java.util.*; import java.util.concurrent.*; -public class MBassador extends AbstractMessageBus>{ +public class MBassador extends AbstractMessageBus>{ public MBassador(){ this(2); @@ -56,8 +56,8 @@ public class MBassador extends AbstractMessageBus>{ @Override - public SimplePostCommand post(T message) { - return new SimplePostCommand(this, message); + public SyncAsyncPostCommand post(T message) { + return new SyncAsyncPostCommand(this, message); } } diff --git a/src/main/java/org/mbassy/SimplePostCommand.java b/src/main/java/org/mbassy/SyncAsyncPostCommand.java similarity index 54% rename from src/main/java/org/mbassy/SimplePostCommand.java rename to src/main/java/org/mbassy/SyncAsyncPostCommand.java index 374ef19..e1ae3cc 100644 --- a/src/main/java/org/mbassy/SimplePostCommand.java +++ b/src/main/java/org/mbassy/SyncAsyncPostCommand.java @@ -1,18 +1,17 @@ package org.mbassy; /** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/12/12 -* Time: 8:44 PM -* To change this template use File | Settings | File Templates. -*/ -public class SimplePostCommand implements IMessageBus.IPostCommand { + * This post command provides access to standard synchronous and asynchronous dispatch + * + * @author bennidi + * Date: 11/12/12 + */ +public class SyncAsyncPostCommand implements IMessageBus.IPostCommand { private T message; private MBassador mBassador; - public SimplePostCommand(MBassador mBassador, T message) { + public SyncAsyncPostCommand(MBassador mBassador, T message) { this.mBassador = mBassador; this.message = message; } diff --git a/src/main/java/org/mbassy/common/ReflectionUtils.java b/src/main/java/org/mbassy/common/ReflectionUtils.java index 5618085..f68c5e0 100644 --- a/src/main/java/org/mbassy/common/ReflectionUtils.java +++ b/src/main/java/org/mbassy/common/ReflectionUtils.java @@ -2,6 +2,7 @@ package org.mbassy.common; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -50,6 +51,15 @@ public class ReflectionUtils { return filtered; } + public static Collection getSuperclasses(Class from) { + Collection superclasses = new LinkedList(); + while (!from.equals(Object.class)) { + superclasses.add(from.getSuperclass()); + from = from.getSuperclass(); + } + return superclasses; + } + public static boolean containsOverridingMethod(List allMethods, Method methodToCheck) { for (Method method : allMethods) { if (isOverriddenBy(methodToCheck, method)) return true; diff --git a/src/main/java/org/mbassy/dispatch/AbstractHandlerInvocation.java b/src/main/java/org/mbassy/dispatch/AbstractHandlerInvocation.java new file mode 100644 index 0000000..50e537f --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/AbstractHandlerInvocation.java @@ -0,0 +1,65 @@ +package org.mbassy.dispatch; + +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.PublicationError; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 11/23/12 + */ +public class AbstractHandlerInvocation { + + private MessagingContext context; + + protected void handlePublicationError(PublicationError error){ + Collection handlers = getContext().getOwningBus().getRegisteredErrorHandlers(); + for(IPublicationErrorHandler handler : handlers){ + handler.handleError(error); + } + } + + protected void invokeHandler(final Object message, final Object listener, Method handler){ + try { + handler.invoke(listener, message); + }catch(IllegalAccessException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "The class or method is not accessible", + handler, listener, message)); + } + catch(IllegalArgumentException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0], + handler, listener, message)); + } + catch (InvocationTargetException e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Message handler threw exception", + handler, listener, message)); + } + catch (Throwable e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Unexpected exception", + handler, listener, message)); + } + } + + + public AbstractHandlerInvocation(MessagingContext context) { + this.context = context; + } + + public MessagingContext getContext() { + return context; + } +} diff --git a/src/main/java/org/mbassy/dispatch/AsynchronousHandlerInvocation.java b/src/main/java/org/mbassy/dispatch/AsynchronousHandlerInvocation.java new file mode 100644 index 0000000..fdbe5e7 --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/AsynchronousHandlerInvocation.java @@ -0,0 +1,39 @@ +package org.mbassy.dispatch; + +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.PublicationError; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; + +/** + * This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously + * + * @author bennidi + * Date: 11/23/12 + */ +public class AsynchronousHandlerInvocation implements IHandlerInvocation { + + private IHandlerInvocation delegate; + + public AsynchronousHandlerInvocation(IHandlerInvocation delegate) { + super(); + this.delegate = delegate; + } + + @Override + public void invoke(final Method handler, final Object listener, final Object message) { + getContext().getOwningBus().getExecutor().execute(new Runnable() { + @Override + public void run() { + delegate.invoke(handler, listener, message); + } + }); + } + + @Override + public MessagingContext getContext() { + return delegate.getContext(); + } +} diff --git a/src/main/java/org/mbassy/dispatch/FilteredMessageDispatcher.java b/src/main/java/org/mbassy/dispatch/FilteredMessageDispatcher.java new file mode 100644 index 0000000..c064ffd --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/FilteredMessageDispatcher.java @@ -0,0 +1,57 @@ +package org.mbassy.dispatch; + +import org.mbassy.common.ConcurrentSet; +import org.mbassy.listener.IMessageFilter; + +/** + * A dispatcher that implements message filtering based on the filter configuration + * of the associated message handler. It will delegate message delivery to another + * message dispatcher after having performed the filtering logic. + * + * @author bennidi + * Date: 11/23/12 + */ +public class FilteredMessageDispatcher implements IMessageDispatcher { + + private final IMessageFilter[] filter; + + private IMessageDispatcher del; + + public FilteredMessageDispatcher(IMessageDispatcher dispatcher) { + this.del = dispatcher; + this.filter = dispatcher.getContext().getHandlerMetadata().getFilter(); + } + + private boolean passesFilter(Object message) { + + if (filter == null) { + return true; + } + else { + for (int i = 0; i < filter.length; i++) { + if (!filter[i].accepts(message, getContext().getHandlerMetadata())) return false; + } + return true; + } + } + + + @Override + public void dispatch(Object message, ConcurrentSet listeners) { + if(passesFilter(message)){ + del.dispatch(message, listeners); + } + } + + @Override + public MessagingContext getContext() { + return del.getContext(); + } + + @Override + public IHandlerInvocation getInvocation() { + return del.getInvocation(); + } + + +} diff --git a/src/main/java/org/mbassy/dispatch/IHandlerInvocation.java b/src/main/java/org/mbassy/dispatch/IHandlerInvocation.java new file mode 100644 index 0000000..f015fb6 --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/IHandlerInvocation.java @@ -0,0 +1,31 @@ +package org.mbassy.dispatch; + +import java.lang.reflect.Method; + +/** + * A handler invocation encapsulates the logic that is used to invoke a single + * message handler to process a given message. + * A handler invocation might come in different flavours and can be composed + * of various independent invocations be means of delegation (decorator pattern) + * + * @author bennidi + * Date: 11/23/12 + */ +public interface IHandlerInvocation { + + /** + * Invoke the message delivery logic of this handler invocation + * + * @param handler The method that represents the actual message handler logic of the listener + * @param listener The listener that will receive the message + * @param message The message to be delivered to the listener + */ + public void invoke(final Method handler, final Object listener, final Object message); + + /** + * Get the messaging context associated with this invocation + * @return + */ + public MessagingContext getContext(); + +} diff --git a/src/main/java/org/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/org/mbassy/dispatch/IMessageDispatcher.java new file mode 100644 index 0000000..132eb05 --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/IMessageDispatcher.java @@ -0,0 +1,46 @@ +package org.mbassy.dispatch; + +import org.mbassy.common.ConcurrentSet; +import org.mbassy.subscription.Subscription; + +/** + * A message dispatcher provides the functionality to deliver a single message + * to a set of listeners. A message dispatcher uses a message context to access + * all information necessary for the message delivery. + * + * The delivery of a single message to a single listener is responsibility of the + * handler invocation object associated with the dispatcher. + * + * Implementations if IMessageDispatcher are partially designed using decorator pattern + * such that it is possible to compose different message dispatchers to achieve more complex + * dispatch logic. + * + * @author bennidi + * Date: 11/23/12 + */ +public interface IMessageDispatcher { + + /** + * Delivers the given message to the given set of listeners. + * Delivery may be delayed, aborted or restricted in various ways, depending + * on the configuration of the dispatcher + * + * @param message The message that should be delivered to the listeners + * @param listeners The listeners that should receive the message + */ + public void dispatch(Object message, ConcurrentSet listeners); + + /** + * Get the messaging context associated with this dispatcher + * + * @return + */ + public MessagingContext getContext(); + + /** + * Get the handler invocation that will be used to deliver the message to each + * listener + * @return + */ + public IHandlerInvocation getInvocation(); +} diff --git a/src/main/java/org/mbassy/dispatch/MessageDispatcher.java b/src/main/java/org/mbassy/dispatch/MessageDispatcher.java new file mode 100644 index 0000000..c9f133f --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/MessageDispatcher.java @@ -0,0 +1,45 @@ +package org.mbassy.dispatch; + +import org.mbassy.common.ConcurrentSet; +import org.mbassy.subscription.Subscription; + +import java.lang.reflect.Method; + +/** + * Standard implementation for direct, unfiltered message delivery. + * + * For each message delivery, this dispatcher iterates over the listeners + * and uses the previously provided handler invocation to deliver the message + * to each listener + * + * @author bennidi + * Date: 11/23/12 + */ +public class MessageDispatcher implements IMessageDispatcher { + + private MessagingContext context; + + private IHandlerInvocation invocation; + + public MessageDispatcher(MessagingContext context, IHandlerInvocation invocation) { + this.context = context; + this.invocation = invocation; + } + + @Override + public void dispatch(Object message, ConcurrentSet listeners) { + Method handler = getContext().getHandlerMetadata().getHandler(); + for(Object listener: listeners){ + getInvocation().invoke(handler, listener, message); + } + } + + public MessagingContext getContext() { + return context; + } + + @Override + public IHandlerInvocation getInvocation() { + return invocation; + } +} diff --git a/src/main/java/org/mbassy/dispatch/MessagingContext.java b/src/main/java/org/mbassy/dispatch/MessagingContext.java new file mode 100644 index 0000000..7687c84 --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/MessagingContext.java @@ -0,0 +1,44 @@ +package org.mbassy.dispatch; + +import org.mbassy.IMessageBus; +import org.mbassy.listener.MessageHandlerMetadata; + +/** + * The messaging context holds all data/objects that is relevant to successfully publish + * a message within a subscription. A one-to-one relation between a subscription and + * MessagingContext holds -> a messaging context is created for each distinct subscription + * that lives inside a message bus. + * + * @author bennidi + * Date: 11/23/12 + */ +public class MessagingContext { + + private IMessageBus owningBus; + + private MessageHandlerMetadata handlerMetadata; + + public MessagingContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) { + this.owningBus = owningBus; + this.handlerMetadata = handlerMetadata; + } + + /** + * Get a reference to the message bus this context belongs to + * @return + */ + public IMessageBus getOwningBus() { + return owningBus; + } + + + /** + * Get the meta data that specifies the characteristics of the message handler + * that is associated with this context + * @return + */ + public MessageHandlerMetadata getHandlerMetadata() { + return handlerMetadata; + } + +} diff --git a/src/main/java/org/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/org/mbassy/dispatch/ReflectiveHandlerInvocation.java new file mode 100644 index 0000000..abcfb95 --- /dev/null +++ b/src/main/java/org/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -0,0 +1,21 @@ +package org.mbassy.dispatch; + +import java.lang.reflect.Method; + +/** + * Uses reflection to invoke a message handler for a given message. + * + * @author bennidi + * Date: 11/23/12 + */ +public class ReflectiveHandlerInvocation extends AbstractHandlerInvocation implements IHandlerInvocation { + + public ReflectiveHandlerInvocation(MessagingContext context) { + super(context); + } + + @Override + public void invoke(final Method handler, final Object listener, final Object message) { + invokeHandler(message, listener, handler); + } +} diff --git a/src/main/java/org/mbassy/listener/Filter.java b/src/main/java/org/mbassy/listener/Filter.java index 370dac1..a9e88b8 100644 --- a/src/main/java/org/mbassy/listener/Filter.java +++ b/src/main/java/org/mbassy/listener/Filter.java @@ -12,12 +12,12 @@ import java.lang.annotation.Target; * to the message listener or not. * *

- * @author benni + * @author bennidi * Date: 2/14/12 */ @Retention(value = RetentionPolicy.RUNTIME) @Target(value = {ElementType.ANNOTATION_TYPE}) public @interface Filter { - Class value(); + Class value(); } diff --git a/src/main/java/org/mbassy/listener/IMessageFilter.java b/src/main/java/org/mbassy/listener/IMessageFilter.java new file mode 100644 index 0000000..2536dda --- /dev/null +++ b/src/main/java/org/mbassy/listener/IMessageFilter.java @@ -0,0 +1,54 @@ +package org.mbassy.listener; + +import java.util.HashMap; +import java.util.Map; + +/** + * Message filters can be used to prevent certain messages to be delivered to a specific listener. + * If a filter is used the message will only be delivered if it passes the filter(s) + * + * NOTE: A message filter must provide either a no-arg constructor. + * + * @author bennidi + * Date: 2/8/12 + */ +public interface IMessageFilter { + + /** + * Evaluate the message to ensure that it matches the handler configuration + * + * + * @param message the message to be delivered + * @return + */ + public boolean accepts(Object message, MessageHandlerMetadata metadata); + + + + public static final class All implements IMessageFilter { + + @Override + public boolean accepts(Object event, MessageHandlerMetadata metadata) { + return true; + } + } + + public static final class None implements IMessageFilter { + + @Override + public boolean accepts(Object event, MessageHandlerMetadata metadata) { + return false; + } + } + + + public static final class DontAllowSubtypes implements IMessageFilter { + + @Override + public boolean accepts(Object event, MessageHandlerMetadata metadata) { + return event.getClass().equals(metadata.getDeclaredMessageType()); + } + } + + +} diff --git a/src/main/java/org/mbassy/listener/Listener.java b/src/main/java/org/mbassy/listener/Listener.java index db213fb..09bc399 100644 --- a/src/main/java/org/mbassy/listener/Listener.java +++ b/src/main/java/org/mbassy/listener/Listener.java @@ -3,17 +3,18 @@ package org.mbassy.listener; import java.lang.annotation.*; /** - * TODO. Insert class description here - *

+ * Mark any method of any object as a message handler and configure the handler + * using different properties. + * * @author bennidi * Date: 2/8/12 - * Time: 3:35 PM */ @Retention(value = RetentionPolicy.RUNTIME) @Inherited @Target(value = {ElementType.METHOD}) public @interface Listener { + Filter[] filters() default {}; // no filters by default Mode dispatch() default Mode.Synchronous; diff --git a/src/main/java/org/mbassy/listener/MessageFilter.java b/src/main/java/org/mbassy/listener/MessageFilter.java deleted file mode 100644 index ebe6b1e..0000000 --- a/src/main/java/org/mbassy/listener/MessageFilter.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.mbassy.listener; - -/** - * Object filters can be used to prevent certain messages to be delivered to a specific listener. - * If a filter is used the message will only be delivered if it passes the filter(s) - * - * @author bennidi - * Date: 2/8/12 - */ -public interface MessageFilter { - - /** - * Evaluate the message and listener to ensure that the message should be handled by the listener - * - * - * @param event the event to be delivered - * @param listener the listener instance that would receive the event if it passes the filter - * @return - */ - public boolean accepts(Object event, Object listener); - - - public static final class All implements MessageFilter { - - @Override - public boolean accepts(Object event, Object listener) { - return true; - } - } - - public static final class None implements MessageFilter { - - @Override - public boolean accepts(Object event, Object listener) { - return false; - } - } - - -} diff --git a/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java b/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java index 14cfd2a..e7d320a 100644 --- a/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java +++ b/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java @@ -1,12 +1,10 @@ package org.mbassy.listener; -import org.mbassy.listener.Listener; -import org.mbassy.listener.Mode; -import org.mbassy.listener.MessageFilter; - import java.lang.reflect.Method; /** + * + * * @author bennidi * Date: 11/14/12 */ @@ -14,18 +12,19 @@ public class MessageHandlerMetadata { private Method handler; - private MessageFilter[] filter; + private IMessageFilter[] filter; private Listener listenerConfig; private boolean isAsynchronous = false; - public MessageHandlerMetadata(Method handler, MessageFilter[] filter, Listener listenerConfig) { + public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) { this.handler = handler; this.filter = filter; this.listenerConfig = listenerConfig; this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous); + this.handler.setAccessible(true); } @@ -34,7 +33,7 @@ public class MessageHandlerMetadata { } public boolean isFiltered(){ - return filter == null || filter.length == 0; + return filter != null && filter.length > 0; } public int getPriority(){ @@ -45,7 +44,11 @@ public class MessageHandlerMetadata { return handler; } - public MessageFilter[] getFilter() { + public IMessageFilter[] getFilter() { return filter; } + + public Class getDeclaredMessageType(){ + return handler.getParameterTypes()[0]; + } } diff --git a/src/main/java/org/mbassy/listener/MetadataReader.java b/src/main/java/org/mbassy/listener/MetadataReader.java index a881819..90f6b3f 100644 --- a/src/main/java/org/mbassy/listener/MetadataReader.java +++ b/src/main/java/org/mbassy/listener/MetadataReader.java @@ -28,15 +28,15 @@ public class MetadataReader { }; // cache already created filter instances - private final Map, MessageFilter> filterCache = new HashMap, MessageFilter>(); + private final Map, IMessageFilter> filterCache = new HashMap, IMessageFilter>(); // retrieve all instances of filters associated with the given subscription - private MessageFilter[] getFilter(Listener subscription) throws Exception{ + private IMessageFilter[] getFilter(Listener subscription) throws Exception{ if (subscription.filters().length == 0) return null; - MessageFilter[] filters = new MessageFilter[subscription.filters().length]; + IMessageFilter[] filters = new IMessageFilter[subscription.filters().length]; int i = 0; for (Filter filterDef : subscription.filters()) { - MessageFilter filter = filterCache.get(filterDef.value()); + IMessageFilter filter = filterCache.get(filterDef.value()); if (filter == null) { filter = filterDef.value().newInstance(); filterCache.put(filterDef.value(), filter); @@ -51,7 +51,7 @@ public class MetadataReader { public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) throws Exception{ Listener config = messageHandler.getAnnotation(Listener.class); - MessageFilter[] filter = getFilter(config); + IMessageFilter[] filter = getFilter(config); return new MessageHandlerMetadata(messageHandler, filter, config); } diff --git a/src/main/java/org/mbassy/subscription/FilteredAsynchronousSubscription.java b/src/main/java/org/mbassy/subscription/FilteredAsynchronousSubscription.java deleted file mode 100644 index d81e700..0000000 --- a/src/main/java/org/mbassy/subscription/FilteredAsynchronousSubscription.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.listener.MessageFilter; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:50 PM -* To change this template use File | Settings | File Templates. -*/ -public class FilteredAsynchronousSubscription extends FilteredSubscription{ - - public FilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - - getMessageBus().getExecutor().execute(new Runnable() { - @Override - public void run() { - invokeHandler(message, listener); - } - }); - } -} diff --git a/src/main/java/org/mbassy/subscription/FilteredSubscription.java b/src/main/java/org/mbassy/subscription/FilteredSubscription.java deleted file mode 100644 index a70d2e1..0000000 --- a/src/main/java/org/mbassy/subscription/FilteredSubscription.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.MBassador; -import org.mbassy.listener.MessageFilter; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; -import java.util.Iterator; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:48 PM -* To change this template use File | Settings | File Templates. -*/ -public abstract class FilteredSubscription extends Subscription{ - - private final MessageFilter[] filter; - - - public FilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - this.filter = messageHandler.getFilter(); - } - - private boolean passesFilter(Object message, Object listener) { - - if (filter == null) { - return true; - } - else { - for (int i = 0; i < filter.length; i++) { - if (!filter[i].accepts(message, listener)) return false; - } - return true; - } - } - - public void publish(Object message) { - - Iterator iterator = listeners.iterator(); - Object listener = null; - while ((listener = iterator.next()) != null) { - if(passesFilter(message, listener)) { - dispatch(message, listener); - } - } - } -} diff --git a/src/main/java/org/mbassy/subscription/FilteredSynchronousSubscription.java b/src/main/java/org/mbassy/subscription/FilteredSynchronousSubscription.java deleted file mode 100644 index a0f529c..0000000 --- a/src/main/java/org/mbassy/subscription/FilteredSynchronousSubscription.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.listener.MessageFilter; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:49 PM -* To change this template use File | Settings | File Templates. -*/ -public class FilteredSynchronousSubscription extends FilteredSubscription { - - - public FilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - invokeHandler(message, listener); - } -} diff --git a/src/main/java/org/mbassy/subscription/Subscription.java b/src/main/java/org/mbassy/subscription/Subscription.java index 7f744b8..f744701 100644 --- a/src/main/java/org/mbassy/subscription/Subscription.java +++ b/src/main/java/org/mbassy/subscription/Subscription.java @@ -4,8 +4,11 @@ import org.mbassy.IMessageBus; import org.mbassy.IPublicationErrorHandler; import org.mbassy.common.ConcurrentSet; import org.mbassy.PublicationError; +import org.mbassy.dispatch.IMessageDispatcher; +import org.mbassy.dispatch.MessagingContext; import org.mbassy.listener.MessageHandlerMetadata; +import javax.xml.ws.handler.MessageContext; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collection; @@ -13,81 +16,39 @@ import java.util.Comparator; import java.util.UUID; /** - * Subscription is a thread safe container for objects that contain message handlers + * A subscription is a thread safe container for objects that contain message handlers */ -public abstract class Subscription { +public class Subscription { private UUID id = UUID.randomUUID(); - private final Method handler; - protected ConcurrentSet listeners = new ConcurrentSet(); - private int priority = 0; + private IMessageDispatcher dispatcher; - private IMessageBus owningBus ; + private MessagingContext context; - public Subscription(IMessageBus owningBus, MessageHandlerMetadata messageHandler) { - this.owningBus = owningBus; - this.priority = messageHandler.getPriority(); - this.handler = messageHandler.getHandler(); - this.handler.setAccessible(true); + public Subscription(MessagingContext context, IMessageDispatcher dispatcher) { + this.context = context; + this.dispatcher = dispatcher; } - public abstract void publish(Object message); - protected abstract void dispatch(final Object message, final Object listener); + public void publish(Object message){ + dispatcher.dispatch(message, listeners); + } - - protected IMessageBus getMessageBus(){ - return owningBus; + public MessagingContext getContext(){ + return context; } public int getPriority(){ - return priority; + return context.getHandlerMetadata().getPriority(); } public void subscribe(Object o) { listeners.add(o); - - } - - protected void handlePublicationError(PublicationError error){ - Collection handlers = owningBus.getRegisteredErrorHandlers(); - for(IPublicationErrorHandler handler : handlers){ - handler.handleError(error); - } - } - - protected void invokeHandler(final Object message, final Object listener){ - try { - handler.invoke(listener, message); - }catch(IllegalAccessException e){ - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "The class or method is not accessible", - handler, listener, message)); - } - catch(IllegalArgumentException e){ - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0], - handler, listener, message)); - } - catch (InvocationTargetException e) { - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Message handler threw exception", - handler, listener, message)); - } - catch (Throwable e) { - handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Unexpected exception", - handler, listener, message)); - } } @@ -95,6 +56,10 @@ public abstract class Subscription { return listeners.remove(existingListener); } + public int size(){ + return listeners.size(); + } + public static final Comparator SubscriptionByPriorityDesc = new Comparator() { @Override diff --git a/src/main/java/org/mbassy/subscription/SubscriptionFactory.java b/src/main/java/org/mbassy/subscription/SubscriptionFactory.java index 4c75a08..a8485fc 100644 --- a/src/main/java/org/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/org/mbassy/subscription/SubscriptionFactory.java @@ -2,6 +2,7 @@ package org.mbassy.subscription; import org.mbassy.IMessageBus; import org.mbassy.IPublicationErrorHandler; +import org.mbassy.dispatch.*; import org.mbassy.listener.MessageHandlerMetadata; import java.util.Collection; @@ -17,27 +18,30 @@ public class SubscriptionFactory { private IMessageBus owner; - public SubscriptionFactory(IMessageBus owner) { this.owner = owner; } public Subscription createSubscription(MessageHandlerMetadata messageHandlerMetadata){ - if(messageHandlerMetadata.isFiltered()){ - if(messageHandlerMetadata.isAsynchronous()){ - return new UnfilteredAsynchronousSubscription(owner, messageHandlerMetadata); - } - else{ - return new UnfilteredSynchronousSubscription(owner, messageHandlerMetadata); - } - } - else{ - if(messageHandlerMetadata.isAsynchronous()){ - return new FilteredAsynchronousSubscription(owner, messageHandlerMetadata); - } - else{ - return new FilteredSynchronousSubscription(owner, messageHandlerMetadata); - } + MessagingContext context = new MessagingContext(owner, messageHandlerMetadata); + IHandlerInvocation invocation = buildInvocationForHandler(context); + IMessageDispatcher dispatcher = buildDispatcher(context, invocation); + return new Subscription(context, dispatcher); + } + + protected IHandlerInvocation buildInvocationForHandler(MessagingContext context){ + IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context); + if(context.getHandlerMetadata().isAsynchronous()){ + invocation = new AsynchronousHandlerInvocation(invocation); } + return invocation; + } + + protected IMessageDispatcher buildDispatcher(MessagingContext context, IHandlerInvocation invocation){ + IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation); + if(context.getHandlerMetadata().isFiltered()){ + dispatcher = new FilteredMessageDispatcher(dispatcher); + } + return dispatcher; } } diff --git a/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java deleted file mode 100644 index 68118d5..0000000 --- a/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.MBassador; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:48 PM -* To change this template use File | Settings | File Templates. -*/ -public class UnfilteredAsynchronousSubscription extends UnfilteredSubscription { - - public UnfilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - getMessageBus().getExecutor().execute(new Runnable() { - @Override - public void run() { - invokeHandler(message, listener); - } - }); - - } -} diff --git a/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java deleted file mode 100644 index ab7d11d..0000000 --- a/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.MBassador; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; -import java.util.Iterator; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:45 PM -* To change this template use File | Settings | File Templates. -*/ -public abstract class UnfilteredSubscription extends Subscription{ - - - public UnfilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - } - - public void publish(Object message) { - - Iterator iterator = listeners.iterator(); - Object listener = null; - while ((listener = iterator.next()) != null) { - dispatch(message, listener); - } - } -} diff --git a/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java deleted file mode 100644 index 9ddd16e..0000000 --- a/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.mbassy.subscription; - -import org.mbassy.IMessageBus; -import org.mbassy.IPublicationErrorHandler; -import org.mbassy.MBassador; -import org.mbassy.listener.MessageHandlerMetadata; - -import java.lang.reflect.Method; -import java.util.Collection; - -/** -* Created with IntelliJ IDEA. -* @author bennidi -* Date: 11/14/12 -* Time: 3:49 PM -* To change this template use File | Settings | File Templates. -*/ -public class UnfilteredSynchronousSubscription extends UnfilteredSubscription{ - - public UnfilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { - super(mBassador, messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - invokeHandler(message, listener); - } -} diff --git a/src/test/java/org/mbassy/AllTests.java b/src/test/java/org/mbassy/AllTests.java new file mode 100644 index 0000000..dbc6147 --- /dev/null +++ b/src/test/java/org/mbassy/AllTests.java @@ -0,0 +1,19 @@ +package org.mbassy; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Test suite for running all available unit tests + * + * @author bennidi + * Date: 11/23/12 + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ConcurrentSetTest.class, + MBassadorTest.class, + FilterTest.class +}) +public class AllTests { +} diff --git a/src/test/java/org/mbassy/ConcurrentSetTest.java b/src/test/java/org/mbassy/ConcurrentSetTest.java index 395719c..09eb32a 100644 --- a/src/test/java/org/mbassy/ConcurrentSetTest.java +++ b/src/test/java/org/mbassy/ConcurrentSetTest.java @@ -9,13 +9,17 @@ import java.util.LinkedList; import java.util.Random; /** - * Created with IntelliJ IDEA. + * This test ensures the correct behaviour of the set implementation that is the building + * block of the subscription implementations used by the Mbassador message bus. + * + * It should behave exactly like other set implementations do and as such all tests are based + * on comparing the outcome of sequence of operations applied to a standard set implementation + * and the concurrent set. + * * @author bennidi - * Date: 11/12/12 - * Time: 3:02 PM - * To change this template use File | Settings | File Templates. + * Date: 11/12/12 */ -public class ConcurrentSetTest extends UnitTest{ +public class ConcurrentSetTest extends UnitTest { private int numberOfElements = 100000; @@ -23,27 +27,27 @@ public class ConcurrentSetTest extends UnitTest{ @Test - public void testIterator(){ - final HashSet distinct = new HashSet(); - - final ConcurrentSet target = new ConcurrentSet(); + public void testIteratorCleanup() { + final HashSet persistingCandidates = new HashSet(); + final ConcurrentSet testSet = new ConcurrentSet(); Random rand = new Random(); - for(int i=0;i < numberOfElements ; i++){ + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); - if(rand.nextInt() % 3 == 0){ - distinct.add(candidate); + if (rand.nextInt() % 3 == 0) { + persistingCandidates.add(candidate); } - target.add(candidate); + testSet.add(candidate); } + // this will remove all objects that have not been inserted into the set of persisting candidates runGC(); ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for(Object src : target){ + for (Object testObject : testSet) { // do nothing // just iterate to trigger automatic clean up System.currentTimeMillis(); @@ -51,139 +55,165 @@ public class ConcurrentSetTest extends UnitTest{ } }, numberOfThreads); - - - - for(Object tar : target){ - Assert.assertTrue(distinct.contains(tar)); + assertEquals(persistingCandidates.size(), testSet.size()); + for (Object test : testSet) { + assertTrue(persistingCandidates.contains(test)); } - } - @Test - public void testInsert(){ + @Test + public void testUniqueness() { final LinkedList duplicates = new LinkedList(); final HashSet distinct = new HashSet(); - final ConcurrentSet target = new ConcurrentSet(); + final ConcurrentSet testSet = new ConcurrentSet(); Random rand = new Random(); + // build set of distinct objects and list of duplicates Object candidate = new Object(); - for(int i=0;i < numberOfElements ; i++){ - if(rand.nextInt() % 3 == 0){ + for (int i = 0; i < numberOfElements; i++) { + if (rand.nextInt() % 3 == 0) { candidate = new Object(); } duplicates.add(candidate); distinct.add(candidate); } - + // insert all elements (containing duplicates) into the set ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for(Object src : duplicates){ - target.add(src); + for (Object src : duplicates) { + testSet.add(src); } } }, numberOfThreads); - pause(3000); - - - for(Object tar : target){ - Assert.assertTrue(distinct.contains(tar)); + // check that the control set and the test set contain the exact same elements + assertEquals(distinct.size(), testSet.size()); + for (Object uniqueObject : distinct) { + assertTrue(testSet.contains(uniqueObject)); } - for(Object src : distinct){ - Assert.assertTrue(target.contains(src)); + + } + + @Test + public void testPerformance(){ + final HashSet source = new HashSet(); + + final HashSet hashSet = new HashSet(); + + final ConcurrentSet concurrentSet = new ConcurrentSet(); + + for (int i = 0; i < 1000000; i++) { + source.add(new Object()); } - Assert.assertEquals(distinct.size(), target.size()); + + long start = System.currentTimeMillis(); + for(Object o: source){ + hashSet.add(o); + } + long duration = System.currentTimeMillis() - start; + System.out.println("Performance of HashSet for 1.000.000 object insertions " + duration); + + start = System.currentTimeMillis(); + for(Object o: source){ + concurrentSet.add(o); + } + duration = System.currentTimeMillis() - start; + System.out.println("Performance of ConcurrentSet for 1.000.000 object insertions " + duration); } - @Test - public void testRemove1(){ + public void testRemove2() { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final ConcurrentSet target = new ConcurrentSet(); - for(int i=0;i < numberOfElements ; i++){ + final ConcurrentSet testSet = new ConcurrentSet(); + // build set of distinct objects and mark a subset of those for removal + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); - if(i % 3 == 0){ + if (i % 3 == 0) { toRemove.add(candidate); } } - + // build the test set from the set of candidates ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for(Object src : source){ - target.add(src); + for (Object src : source) { + testSet.add(src); } } }, numberOfThreads); + // remove all candidates that have previously been marked for removal from the test set ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for(Object src : toRemove){ - target.remove(src); + for (Object src : toRemove) { + testSet.remove(src); } } }, numberOfThreads); - pause(3000); - - for(Object tar : target){ + // ensure that the test set does not contain any of the elements that have been removed from it + for (Object tar : testSet) { Assert.assertTrue(!toRemove.contains(tar)); } - - for(Object src : source){ - if(!toRemove.contains(src))Assert.assertTrue(target.contains(src)); + // ensure that the test set still contains all objects from the source set that have not been marked + // for removal + assertEquals(source.size() - toRemove.size(), testSet.size()); + for (Object src : source) { + if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); } } @Test - public void testRemove2(){ + public void testRemoval() { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final ConcurrentSet target = new ConcurrentSet(); - for(int i=0;i < numberOfElements ; i++){ + final ConcurrentSet testSet = new ConcurrentSet(); + // build set of candidates and mark subset for removal + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); - if(i % 3 == 0){ + if (i % 3 == 0) { toRemove.add(candidate); } } - + // build test set by adding the candidates + // and subsequently removing those marked for removal ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for(Object src : source){ - target.add(src); - if(toRemove.contains(src)) - target.remove(src); + for (Object src : source) { + testSet.add(src); + if (toRemove.contains(src)) + testSet.remove(src); } } }, numberOfThreads); - pause(3000); - - for(Object tar : target){ + // ensure that the test set does not contain any of the elements that have been removed from it + for (Object tar : testSet) { Assert.assertTrue(!toRemove.contains(tar)); } - - for(Object src : source){ - if(!toRemove.contains(src))Assert.assertTrue(target.contains(src)); + // ensure that the test set still contains all objects from the source set that have not been marked + // for removal + assertEquals(source.size() - toRemove.size(), testSet.size()); + for (Object src : source) { + if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); } } diff --git a/src/test/java/org/mbassy/FilterTest.java b/src/test/java/org/mbassy/FilterTest.java new file mode 100644 index 0000000..9fa3d90 --- /dev/null +++ b/src/test/java/org/mbassy/FilterTest.java @@ -0,0 +1,57 @@ +package org.mbassy; + +import org.junit.Test; +import org.mbassy.events.SubTestEvent; +import org.mbassy.events.TestEvent; +import org.mbassy.listener.Filter; +import org.mbassy.listener.IMessageFilter; +import org.mbassy.listener.Listener; +import org.mbassy.listeners.*; + +import java.util.List; + +/** + * Testing of filter functionality + * + * @author bennidi + * Date: 11/26/12 + */ +public class FilterTest extends UnitTest{ + + @Test + public void testSubclassFilter() throws Exception { + + MBassador bus = new MBassador(); + ListenerFactory listenerFactory = new ListenerFactory() + .create(100, FilteredMessageListener.class) + .create(100, Object.class) + .create(100, NonListeningBean.class); + + List listeners = listenerFactory.build(); + + // this will subscribe the listeners concurrently to the bus + TestUtil.setup(bus, listeners, 10); + + TestEvent event = new TestEvent(); + TestEvent subTestEvent = new SubTestEvent(); + + bus.post(event).now(); + bus.post(subTestEvent).now(); + + assertEquals(100, event.counter.get()); + assertEquals(0, subTestEvent.counter.get()); + + } + + + public static class FilteredMessageListener{ + + @Listener(filters = {@Filter(IMessageFilter.DontAllowSubtypes.class)}) + public void handleTestEvent(TestEvent event){ + event.counter.incrementAndGet(); + } + + + } + +} diff --git a/src/test/java/org/mbassy/MBassadorTest.java b/src/test/java/org/mbassy/MBassadorTest.java index bfb6f54..f4f8b7b 100644 --- a/src/test/java/org/mbassy/MBassadorTest.java +++ b/src/test/java/org/mbassy/MBassadorTest.java @@ -4,8 +4,11 @@ import org.junit.Test; import org.mbassy.events.SubTestEvent; import org.mbassy.events.TestEvent; import org.mbassy.listeners.*; +import org.mbassy.subscription.Subscription; import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -17,27 +20,62 @@ import java.util.concurrent.CopyOnWriteArrayList; */ public class MBassadorTest extends UnitTest { - + // this is a single threaded test for subscribing and unsubscribing of a single listener @Test public void testSubscribeSimple() throws InterruptedException { - MBassador bus = new MBassador(); + List listeners = new LinkedList(); int listenerCount = 1000; + // subscribe a number of listeners to the bus for (int i = 1; i <= listenerCount; i++) { EventingTestBean listener = new EventingTestBean(); NonListeningBean nonListener = new NonListeningBean(); + listeners.add(listener); + bus.subscribe(listener); bus.subscribe(nonListener); - assertTrue(bus.unsubscribe(listener)); - assertFalse(bus.unsubscribe(nonListener)); + + assertFalse(bus.unsubscribe(nonListener)); // these are not expected to be subscribed listeners assertFalse(bus.unsubscribe(new EventingTestBean())); } + + // check the generated subscriptions for existence of all previously subscribed valid listeners + Collection testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class); + assertEquals(1, testEventsubscriptions.size()); + assertEquals(listenerCount, getNumberOfSubscribedListeners(testEventsubscriptions)); + + Collection subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class); + assertEquals(3, subTestEventsubscriptions.size()); + assertEquals(3 * listenerCount, getNumberOfSubscribedListeners(subTestEventsubscriptions)); + + // unsubscribe the listeners + for(Object listener : listeners){ + assertTrue(bus.unsubscribe(listener)); // this listener is expected to exist + } + + // no listener should be left + testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class); + assertEquals(1, testEventsubscriptions.size()); + assertEquals(0, getNumberOfSubscribedListeners(testEventsubscriptions)); + + subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class); + assertEquals(3, subTestEventsubscriptions.size()); + assertEquals(0, getNumberOfSubscribedListeners(subTestEventsubscriptions)); + + } + + private int getNumberOfSubscribedListeners(Collection subscriptions) { + int listeners = 0; + for (Subscription sub : subscriptions) { + listeners += sub.size(); + } + return listeners; } @Test - public void testSubscribeConcurrent() throws Exception { + public void testConcurrentSubscription() throws Exception { MBassador bus = new MBassador(); ListenerFactory listenerFactory = new ListenerFactory() @@ -48,34 +86,37 @@ public class MBassadorTest extends UnitTest { .create(100, NonListeningBean.class); List listeners = listenerFactory.build(); + + // this will subscribe the listeners concurrently to the bus TestUtil.setup(bus, listeners, 10); - TestEvent event = new TestEvent(); - SubTestEvent subEvent = new SubTestEvent(); + // check the generated subscriptions for existence of all previously subscribed valid listeners + Collection testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class); + assertEquals(3, testEventsubscriptions.size()); + assertEquals(300, getNumberOfSubscribedListeners(testEventsubscriptions)); - bus.publish(event); - bus.publish(subEvent); - - pause(4000); - - assertEquals(300, event.counter.get()); - assertEquals(700, subEvent.counter.get()); + Collection subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class); + assertEquals(10, subTestEventsubscriptions.size()); + assertEquals(1000, getNumberOfSubscribedListeners(subTestEventsubscriptions)); } @Test - public void testAsynchronous() throws InterruptedException { + public void testAsynchronousMessagePublication() throws Exception { MBassador bus = new MBassador(); - int listenerCount = 1000; - List persistentReferences = new ArrayList(); + ListenerFactory listenerFactory = new ListenerFactory() + .create(100, EventingTestBean.class) + .create(100, EventingTestBean2.class) + .create(100, EventingTestBean3.class) + .create(100, Object.class) + .create(100, NonListeningBean.class); - for (int i = 1; i <= listenerCount; i++) { - EventingTestBean bean = new EventingTestBean(); - persistentReferences.add(bean); - bus.subscribe(bean); - } + List listeners = listenerFactory.build(); + + // this will subscribe the listeners concurrently to the bus + TestUtil.setup(bus, listeners, 10); TestEvent event = new TestEvent(); TestEvent subEvent = new SubTestEvent(); @@ -85,64 +126,64 @@ public class MBassadorTest extends UnitTest { pause(2000); - assertTrue(event.counter.get() == 1000); - assertTrue(subEvent.counter.get() == 1000 * 2); + assertEquals(300, event.counter.get()); + assertEquals(700, subEvent.counter.get()); } @Test - public void testSynchronous() throws InterruptedException { + public void testSynchronousMessagePublication() throws Exception { MBassador bus = new MBassador(); - int listenerCount = 10; - List persistentReferences = new ArrayList(); - for (int i = 1; i <= listenerCount; i++) { + ListenerFactory listenerFactory = new ListenerFactory() + .create(100, EventingTestBean.class) + .create(100, EventingTestBean2.class) + .create(100, EventingTestBean3.class) + .create(100, Object.class) + .create(100, NonListeningBean.class); + List listeners = listenerFactory.build(); - EventingTestBean bean = new EventingTestBean(); - persistentReferences.add(bean); - bus.subscribe(bean); + // this will subscribe the listeners concurrently to the bus + TestUtil.setup(bus, listeners, 10); - TestEvent event = new TestEvent(); - TestEvent subEvent = new SubTestEvent(); + TestEvent event = new TestEvent(); + TestEvent subEvent = new SubTestEvent(); - bus.publish(event); - bus.publish(subEvent); + bus.publish(event); + bus.publish(subEvent); - assertEquals(i, event.counter.get()); + pause(2000); - pause(50); - - assertEquals(i * 2, subEvent.counter.get()); - - } + assertEquals(300, event.counter.get()); + assertEquals(700, subEvent.counter.get()); } @Test - public void testConcurrentPublication() { - final MBassador bus = new MBassador(); - final int listenerCount = 100; - final int concurrency = 20; + public void testConcurrentMixedMessagePublication() throws Exception { final CopyOnWriteArrayList testEvents = new CopyOnWriteArrayList(); final CopyOnWriteArrayList subtestEvents = new CopyOnWriteArrayList(); - final CopyOnWriteArrayList persistentReferences = new CopyOnWriteArrayList(); + final int eventLoopsPerTHread = 100; + + + final MBassador bus = new MBassador(); + ListenerFactory listenerFactory = new ListenerFactory() + .create(100, EventingTestBean.class) + .create(100, EventingTestBean2.class) + .create(100, EventingTestBean3.class) + .create(100, Object.class) + .create(100, NonListeningBean.class); + + List listeners = listenerFactory.build(); + + // this will subscribe the listeners concurrently to the bus + TestUtil.setup(bus, listeners, 10); ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - for (int i = 0; i < listenerCount; i++) { - EventingTestBean bean = new EventingTestBean(); - persistentReferences.add(bean); - bus.subscribe(bean); - } - } - }, concurrency); - - ConcurrentExecutor.runConcurrent(new Runnable() { - @Override - public void run() { - for (int i = 0; i < listenerCount; i++) { + for (int i = 0; i < eventLoopsPerTHread; i++) { TestEvent event = new TestEvent(); SubTestEvent subEvent = new SubTestEvent(); testEvents.add(event); @@ -152,16 +193,16 @@ public class MBassadorTest extends UnitTest { bus.publish(subEvent); } } - }, concurrency); + }, 10); pause(3000); for (TestEvent event : testEvents) { - assertEquals(listenerCount * concurrency, event.counter.get()); + assertEquals(300, event.counter.get()); } for (SubTestEvent event : subtestEvents) { - assertEquals(listenerCount * concurrency * 2, event.counter.get()); + assertEquals(700, event.counter.get()); } } diff --git a/src/test/java/org/mbassy/listeners/EventingTestBean.java b/src/test/java/org/mbassy/listeners/EventingTestBean.java index 191582a..1ed6bc4 100644 --- a/src/test/java/org/mbassy/listeners/EventingTestBean.java +++ b/src/test/java/org/mbassy/listeners/EventingTestBean.java @@ -3,8 +3,8 @@ package org.mbassy.listeners; import org.mbassy.events.SubTestEvent; import org.mbassy.events.TestEvent; import org.mbassy.listener.Filter; +import org.mbassy.listener.IMessageFilter; import org.mbassy.listener.Listener; -import org.mbassy.listener.MessageFilter; import org.mbassy.listener.Mode; /** @@ -33,7 +33,7 @@ public class EventingTestBean { @Listener( priority = 10, dispatch = Mode.Synchronous, - filters = {@Filter(MessageFilter.None.class), @Filter(MessageFilter.All.class)}) + filters = {@Filter(IMessageFilter.None.class), @Filter(IMessageFilter.All.class)}) public void handleFiltered(SubTestEvent event) { event.counter.incrementAndGet(); } diff --git a/src/test/java/org/mbassy/listeners/NonListeningBean.java b/src/test/java/org/mbassy/listeners/NonListeningBean.java index 3ae7b21..2905590 100644 --- a/src/test/java/org/mbassy/listeners/NonListeningBean.java +++ b/src/test/java/org/mbassy/listeners/NonListeningBean.java @@ -7,7 +7,7 @@ import org.mbassy.listener.Mode; /** * This bean overrides all the handlers defined in its superclass. Since it does not specify any annotations - * it should be considered a message lister + * it should not be considered a message listener * * @author bennidi * Date: 11/22/12