diff --git a/README.md b/README.md index 9cc6536..36a0313 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,16 @@ to avoid confusion and misunderstanding.

Release Notes

+

1.1.10

+ + Fixed broken sort order of prioritized handlers (see #58) + + Addressed issue #63 by making the constructor of `MessageHandler` use a map of properties and by replacing dependencies to + all MBassador specific annotations with Java primitives and simple interfaces + + Small refactorings (moved stuff around to have cleaner packaging) + + MessageBus.getExecutor() is now deprecated and will be removed with next release -> use the runtime to get access to it. + + Introduced BusFactory with convenience methods for creating bus instances for different message dispatching scenarios like + asynchronous FIFO (asynchronous message publications guaranteed to be delivered in the order they occurred) + + Renamed runtime property of `BusRuntime` "handler.async-service" to "handler.async.executor" +

1.1.9

+ Fixed memory leak reported in issue #53 diff --git a/src/docs/TODO.md b/src/docs/TODO.md index 7270c7e..4494197 100644 --- a/src/docs/TODO.md +++ b/src/docs/TODO.md @@ -18,5 +18,6 @@ Add code examples Javadoc of main classes Describe 1-Thread FIFO scheme with async dispatch Explain how MBassador can be extended easily using delegation Refer to Spring integration component +Creating bus hierarchies How to make sender part of the message publication How to add global filtering by means of delegation diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index e74be6f..8a04a8c 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -1,9 +1,10 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.IPublicationErrorHandler; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.common.DeadMessage; +import net.engio.mbassy.bus.common.PubSubSupport; import net.engio.mbassy.bus.config.ISyncBusConfiguration; -import net.engio.mbassy.common.DeadMessage; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.SubscriptionManager; @@ -17,7 +18,7 @@ import java.util.List; * * @param */ -public abstract class AbstractPubSubSupport implements PubSubSupport{ +public abstract class AbstractPubSubSupport implements PubSubSupport { // this handler will receive all errors that occur during message dispatch or message handling @@ -32,7 +33,7 @@ public abstract class AbstractPubSubSupport implements PubSubSupport{ public AbstractPubSubSupport(ISyncBusConfiguration configuration) { this.runtime = new BusRuntime(this); - this.runtime.add("error.handlers", getRegisteredErrorHandlers()); + this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers()); this.subscriptionManager = configuration.getSubscriptionManagerProvider() .createManager(configuration.getMetadataReader(), configuration.getSubscriptionFactory(), runtime); diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index d6b7e27..1a2cba6 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -1,7 +1,8 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.IBusConfiguration; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand; import java.util.ArrayList; @@ -12,7 +13,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** - * The base class for all async message bus implementations. + * The base class for all message bus implementations with support for asynchronous message dispatch * * @param * @param

@@ -32,7 +33,7 @@ public abstract class AbstractSyncAsyncMessageBus(configuration.getNumberOfMessageDispatchers()); initDispatcherThreads(configuration); @@ -61,6 +62,7 @@ public abstract class AbstractSyncAsyncMessageBus properties = new HashMap(); diff --git a/src/main/java/net/engio/mbassy/bus/IMBassador.java b/src/main/java/net/engio/mbassy/bus/IMBassador.java deleted file mode 100644 index b738142..0000000 --- a/src/main/java/net/engio/mbassy/bus/IMBassador.java +++ /dev/null @@ -1,19 +0,0 @@ -package net.engio.mbassy.bus; - -import net.engio.mbassy.bus.publication.SyncAsyncPostCommand; - -import java.util.concurrent.TimeUnit; - -/** - * Created with IntelliJ IDEA. - * User: benjamin - * Date: 8/21/13 - * Time: 11:05 AM - * To change this template use File | Settings | File Templates. - */ -public interface IMBassador extends IMessageBus> { - - MessagePublication publishAsync(T message); - - MessagePublication publishAsync(T message, long timeout, TimeUnit unit); -} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index ee4206f..e332211 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -1,25 +1,24 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.BusConfiguration; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.bus.publication.SyncAsyncPostCommand; import java.util.concurrent.TimeUnit; -public class MBassador extends AbstractSyncAsyncMessageBus> implements IMBassador { +public class MBassador extends AbstractSyncAsyncMessageBus> implements IMessageBus> { public MBassador(BusConfiguration configuration) { super(configuration); } - @Override public MessagePublication publishAsync(T message) { return addAsynchronousPublication(createMessagePublication(message)); } - @Override public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) { return addAsynchronousPublication(createMessagePublication(message), timeout, unit); } diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java index 8eae03a..75299fc 100644 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/MessagePublication.java @@ -1,7 +1,7 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.common.DeadMessage; -import net.engio.mbassy.common.FilteredMessage; +import net.engio.mbassy.bus.common.DeadMessage; +import net.engio.mbassy.bus.common.FilteredMessage; import net.engio.mbassy.subscription.Subscription; import java.util.Collection; @@ -79,11 +79,11 @@ public class MessagePublication { public boolean isDeadEvent() { - return DeadMessage.class.isAssignableFrom(message.getClass()); + return DeadMessage.class.equals(message.getClass()); } public boolean isFilteredEvent() { - return FilteredMessage.class.isAssignableFrom(message.getClass()); + return FilteredMessage.class.equals(message.getClass()); } public Object getMessage() { diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index 9dfb911..f22c271 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -1,7 +1,8 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.common.ISyncMessageBus; import net.engio.mbassy.bus.config.ISyncBusConfiguration; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.bus.publication.IPublicationCommand; /** @@ -9,7 +10,7 @@ import net.engio.mbassy.bus.publication.IPublicationCommand; * will not create any new threads. * */ -public class SyncMessageBus extends AbstractPubSubSupport implements ISyncMessageBus{ +public class SyncMessageBus extends AbstractPubSubSupport implements ISyncMessageBus { public SyncMessageBus(ISyncBusConfiguration configuration) { diff --git a/src/main/java/net/engio/mbassy/common/DeadMessage.java b/src/main/java/net/engio/mbassy/bus/common/DeadMessage.java similarity index 67% rename from src/main/java/net/engio/mbassy/common/DeadMessage.java rename to src/main/java/net/engio/mbassy/bus/common/DeadMessage.java index abd83d4..9a73864 100644 --- a/src/main/java/net/engio/mbassy/common/DeadMessage.java +++ b/src/main/java/net/engio/mbassy/bus/common/DeadMessage.java @@ -1,7 +1,7 @@ -package net.engio.mbassy.common; +package net.engio.mbassy.bus.common; /** - * The DeadEvent is delivered to all subscribed handlers (if any) whenever no message + * The dead message event is published whenever no message * handlers could be found for a given message publication. * * @author bennidi diff --git a/src/main/java/net/engio/mbassy/bus/ErrorHandlingSupport.java b/src/main/java/net/engio/mbassy/bus/common/ErrorHandlingSupport.java similarity index 89% rename from src/main/java/net/engio/mbassy/bus/ErrorHandlingSupport.java rename to src/main/java/net/engio/mbassy/bus/common/ErrorHandlingSupport.java index 2da0570..4ac9fe9 100644 --- a/src/main/java/net/engio/mbassy/bus/ErrorHandlingSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/ErrorHandlingSupport.java @@ -1,6 +1,6 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; -import net.engio.mbassy.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; import java.util.Collection; diff --git a/src/main/java/net/engio/mbassy/common/FilteredMessage.java b/src/main/java/net/engio/mbassy/bus/common/FilteredMessage.java similarity index 55% rename from src/main/java/net/engio/mbassy/common/FilteredMessage.java rename to src/main/java/net/engio/mbassy/bus/common/FilteredMessage.java index 65a284f..488ba1a 100644 --- a/src/main/java/net/engio/mbassy/common/FilteredMessage.java +++ b/src/main/java/net/engio/mbassy/bus/common/FilteredMessage.java @@ -1,14 +1,14 @@ -package net.engio.mbassy.common; +package net.engio.mbassy.bus.common; /** - * A filtered event is published when there have been matching subscriptions for a given + * A filtered message event is published when there have been matching subscriptions for a given * message publication but configured filters prevented the message from being delivered to * any of the handlers. * * @author bennidi * Date: 3/1/13 */ -public class FilteredMessage extends PublicationEvent { +public final class FilteredMessage extends PublicationEvent { public FilteredMessage(Object event) { diff --git a/src/main/java/net/engio/mbassy/bus/GenericMessagePublicationSupport.java b/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java similarity index 96% rename from src/main/java/net/engio/mbassy/bus/GenericMessagePublicationSupport.java rename to src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java index 01687a9..c79448c 100644 --- a/src/main/java/net/engio/mbassy/bus/GenericMessagePublicationSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; import net.engio.mbassy.bus.publication.IPublicationCommand; diff --git a/src/main/java/net/engio/mbassy/bus/IMessageBus.java b/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java similarity index 95% rename from src/main/java/net/engio/mbassy/bus/IMessageBus.java rename to src/main/java/net/engio/mbassy/bus/common/IMessageBus.java index dd1bca2..316053a 100644 --- a/src/main/java/net/engio/mbassy/bus/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand; @@ -70,14 +70,16 @@ public interface IMessageBus * Get the executor service that is used for asynchronous message publications. * The executor is passed to the message bus at creation time. * + * Note: The executor can be obtained from the run time. See * @return */ + @Deprecated Executor getExecutor(); /** * Check whether any asynchronous message publications are pending to be processed * - * @return + * @return true if any unfinished message publications are found */ boolean hasPendingMessages(); diff --git a/src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java similarity index 88% rename from src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java rename to src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java index 2836686..7879772 100644 --- a/src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; import net.engio.mbassy.bus.publication.IPublicationCommand; diff --git a/src/main/java/net/engio/mbassy/bus/PubSubSupport.java b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java similarity index 97% rename from src/main/java/net/engio/mbassy/bus/PubSubSupport.java rename to src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java index e1ea998..f0cebc1 100644 --- a/src/main/java/net/engio/mbassy/bus/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; /** * This interface defines the very basic message publication semantics according to the publish subscribe pattern. diff --git a/src/main/java/net/engio/mbassy/common/PublicationEvent.java b/src/main/java/net/engio/mbassy/bus/common/PublicationEvent.java similarity index 69% rename from src/main/java/net/engio/mbassy/common/PublicationEvent.java rename to src/main/java/net/engio/mbassy/bus/common/PublicationEvent.java index a3eb191..e1f81f2 100644 --- a/src/main/java/net/engio/mbassy/common/PublicationEvent.java +++ b/src/main/java/net/engio/mbassy/bus/common/PublicationEvent.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.common; +package net.engio.mbassy.bus.common; /** * A wrapped event is created when various conditions are matched (these depend on the concrete @@ -9,13 +9,13 @@ package net.engio.mbassy.common; */ public abstract class PublicationEvent { - private Object event; + private Object relatedMessage; public PublicationEvent(Object message) { - this.event = message; + this.relatedMessage = message; } public Object getMessage() { - return event; + return relatedMessage; } } diff --git a/src/main/java/net/engio/mbassy/bus/RuntimeProvider.java b/src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java similarity index 69% rename from src/main/java/net/engio/mbassy/bus/RuntimeProvider.java rename to src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java index 5a3036c..6ae2ed9 100644 --- a/src/main/java/net/engio/mbassy/bus/RuntimeProvider.java +++ b/src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java @@ -1,4 +1,6 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy.bus.common; + +import net.engio.mbassy.bus.BusRuntime; /** * Each message bus provides a runtime object to access its dynamic features and runtime configuration. diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java index 43edfef..946bb00 100644 --- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java @@ -175,8 +175,9 @@ public class BusConfiguration implements IBusConfiguration { return metadataReader; } - public void setMetadataReader(MetadataReader metadataReader) { + public BusConfiguration setMetadataReader(MetadataReader metadataReader) { this.metadataReader = metadataReader; + return this; } @Override @@ -193,7 +194,8 @@ public class BusConfiguration implements IBusConfiguration { return subscriptionManagerProvider; } - public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { + public BusConfiguration setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { this.subscriptionManagerProvider = subscriptionManagerProvider; + return this; } } diff --git a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java index 4922cfc..2629945 100644 --- a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java @@ -5,8 +5,16 @@ import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; +/** + * The configuration options for the synchronous message bus {@link net.engio.mbassy.bus.SyncMessageBus} + */ public interface ISyncBusConfiguration { + /** + * The message publication factory is used to wrap a published message + * and while it is being processed + * @return The factory to be used by the bus to create the publications + */ MessagePublication.Factory getMessagePublicationFactory(); MetadataReader getMetadataReader(); diff --git a/src/main/java/net/engio/mbassy/IPublicationErrorHandler.java b/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java similarity index 97% rename from src/main/java/net/engio/mbassy/IPublicationErrorHandler.java rename to src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java index 362c41b..1bbfd31 100644 --- a/src/main/java/net/engio/mbassy/IPublicationErrorHandler.java +++ b/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java @@ -1,4 +1,4 @@ -package net.engio.mbassy; +package net.engio.mbassy.bus.error; /** * Publication error handlers are provided with a publication error every time an diff --git a/src/main/java/net/engio/mbassy/MessageBusException.java b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java similarity index 92% rename from src/main/java/net/engio/mbassy/MessageBusException.java rename to src/main/java/net/engio/mbassy/bus/error/MessageBusException.java index c5a5473..9637836 100644 --- a/src/main/java/net/engio/mbassy/MessageBusException.java +++ b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java @@ -1,4 +1,4 @@ -package net.engio.mbassy; +package net.engio.mbassy.bus.error; /** * Todo: Add javadoc diff --git a/src/main/java/net/engio/mbassy/common/MissingPropertyException.java b/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java similarity index 93% rename from src/main/java/net/engio/mbassy/common/MissingPropertyException.java rename to src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java index 177c4cd..08faf21 100644 --- a/src/main/java/net/engio/mbassy/common/MissingPropertyException.java +++ b/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.common; +package net.engio.mbassy.bus.error; /** * This exception is thrown when a property value that is unavailable at runtime is accessed. diff --git a/src/main/java/net/engio/mbassy/PublicationError.java b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java similarity index 99% rename from src/main/java/net/engio/mbassy/PublicationError.java rename to src/main/java/net/engio/mbassy/bus/error/PublicationError.java index 3af4b2f..0903f0f 100644 --- a/src/main/java/net/engio/mbassy/PublicationError.java +++ b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java @@ -1,4 +1,4 @@ -package net.engio.mbassy; +package net.engio.mbassy.bus.error; import net.engio.mbassy.bus.MessagePublication; diff --git a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java index 5297eae..7b80830 100644 --- a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.BusRuntime; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import java.util.concurrent.ExecutorService; @@ -19,7 +20,7 @@ public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAw public AsynchronousHandlerInvocation(IHandlerInvocation delegate) { super(delegate.getContext()); this.delegate = delegate; - this.executor = delegate.getContext().getRuntime().get("handler.async-service"); + this.executor = delegate.getContext().getRuntime().get(BusRuntime.Properties.AsynchronousHandlerExecutor); } /** diff --git a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java index d5b4dc0..4075c95 100644 --- a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java @@ -1,7 +1,7 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.IPublicationErrorHandler; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import net.engio.mbassy.subscription.SubscriptionContext; diff --git a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java index dc6b374..796c3d6 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.SubscriptionContext; import java.lang.reflect.InvocationTargetException; diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/listener/MessageHandler.java index 69091a2..4ce8b39 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandler.java @@ -3,30 +3,80 @@ package net.engio.mbassy.listener; import net.engio.mbassy.dispatch.HandlerInvocation; import java.lang.reflect.Method; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; /** * Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains - * the handler defines the message listener and more generally, any class containing a message handler in its class hierarchy - * defines a message listener. + * the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy + * defines such a message listener. * * @author bennidi * Date: 11/14/12 */ public class MessageHandler { + public static final class Properties{ + + public static final String HandlerMethod = "handler"; + public static final String InvocationMode = "invocationMode"; + public static final String Filter = "filter"; + public static final String Enveloped = "envelope"; + public static final String HandledMessages = "messages"; + public static final String IsSynchronized = "synchronized"; + public static final String Listener = "listener"; + public static final String AcceptSubtypes = "subtypes"; + public static final String Priority = "priority"; + public static final String Invocation = "invocation"; + + /** + * Create the property map for the {@link MessageHandler} constructor using the default objects. + * + * @param handler The handler annotated method of the listener + * @param handlerConfig The annotation that configures the handler + * @param filter The set of preconfigured filters if any + * @param listenerConfig The listener metadata + * @return A map of properties initialized from the given parameters that will conform to the requirements of the + * {@link MessageHandler} constructor. See {@see MessageHandler.validate()} for more details. + */ + public static final Map Create(Method handler, Handler handlerConfig, IMessageFilter[] filter, MessageListener listenerConfig){ + if(handler == null){ + throw new IllegalArgumentException("The message handler configuration may not be null"); + } + net.engio.mbassy.listener.Enveloped enveloped = handler.getAnnotation(Enveloped.class); + Class[] handledMessages = enveloped != null + ? enveloped.messages() + : handler.getParameterTypes(); + handler.setAccessible(true); + Map properties = new HashMap(); + properties.put(HandlerMethod, handler); + properties.put(Filter, filter != null ? filter : new IMessageFilter[]{}); + properties.put(Priority, handlerConfig.priority()); + properties.put(Invocation, handlerConfig.invocation()); + properties.put(InvocationMode, handlerConfig.delivery()); + properties.put(Enveloped, enveloped != null); + properties.put(AcceptSubtypes, !handlerConfig.rejectSubtypes()); + properties.put(Listener, listenerConfig); + properties.put(IsSynchronized, handler.getAnnotation(Synchronized.class) != null); + properties.put(HandledMessages, handledMessages); + return properties; + } + } + + private final Method handler; private final IMessageFilter[] filter; - private final Handler handlerConfig; + private final int priority; - private final boolean isAsynchronous; + private final Class invocation; - private final Enveloped envelope; + private final Invoke invocationMode; - private final List> handledMessages = new LinkedList>(); + private final boolean isEnvelope; + + private final Class[] handledMessages; private final boolean acceptsSubtypes; @@ -34,29 +84,40 @@ public class MessageHandler { private final boolean isSynchronized; - private Class listeningClass; + public MessageHandler(Map properties){ + super(); + validate(properties); + this.handler = (Method)properties.get(Properties.HandlerMethod); + this.filter = (IMessageFilter[])properties.get(Properties.Filter); + this.priority = (Integer)properties.get(Properties.Priority); + this.invocation = (Class)properties.get(Properties.Invocation); + this.invocationMode = (Invoke)properties.get(Properties.InvocationMode); + this.isEnvelope = (Boolean)properties.get(Properties.Enveloped); + this.acceptsSubtypes = (Boolean)properties.get(Properties.AcceptSubtypes); + this.listenerConfig = (MessageListener)properties.get(Properties.Listener); + this.isSynchronized = (Boolean)properties.get(Properties.IsSynchronized); + this.handledMessages = (Class[])properties.get(Properties.HandledMessages); + } + + private void validate(Map properties){ + Object[][] expectedProperties = new Object[][]{ + new Object[]{Properties.HandlerMethod, Method.class }, + new Object[]{Properties.Priority, Integer.class }, + new Object[]{Properties.Invocation, Class.class }, + new Object[]{Properties.Filter, IMessageFilter[].class }, + new Object[]{Properties.Enveloped, Boolean.class }, + new Object[]{Properties.HandledMessages, Class[].class }, + new Object[]{Properties.IsSynchronized, Boolean.class }, + new Object[]{Properties.Listener, MessageListener.class }, + new Object[]{Properties.AcceptSubtypes, Boolean.class } + }; + for(Object[] property : expectedProperties){ + if (properties.get(property[0]) == null || !((Class)property[1]).isAssignableFrom(properties.get(property[0]).getClass())) + throw new IllegalArgumentException("Property " + property[0] + " was expected to be not null and of type " + property[1] + + " but was: " + properties.get(property[0])); + } - public MessageHandler(Method handler, IMessageFilter[] filter, Handler handlerConfig, MessageListener listenerConfig) { - if(handler == null || handlerConfig == null){ - throw new IllegalArgumentException("The message handler configuration may not be null"); - } - this.handler = handler; - this.filter = filter; - this.handlerConfig = handlerConfig; - this.isAsynchronous = handlerConfig.delivery().equals(Invoke.Asynchronously); - this.envelope = handler.getAnnotation(Enveloped.class); - this.acceptsSubtypes = !handlerConfig.rejectSubtypes(); - this.listenerConfig = listenerConfig; - this.isSynchronized = handler.getAnnotation(Synchronized.class) != null; - if (this.envelope != null) { - for(Class messageType : envelope.messages()){ - handledMessages.add(messageType); - } - } else { - handledMessages.add(handler.getParameterTypes()[0]); - } - this.handler.setAccessible(true); } public boolean isSynchronized(){ @@ -72,15 +133,15 @@ public class MessageHandler { } public boolean isAsynchronous() { - return isAsynchronous; + return invocationMode.equals(Invoke.Asynchronously); } public boolean isFiltered() { - return filter != null && filter.length > 0; + return filter.length > 0; } public int getPriority() { - return handlerConfig.priority(); + return priority; } public Method getHandler() { @@ -91,16 +152,16 @@ public class MessageHandler { return filter; } - public List> getHandledMessages() { + public Class[] getHandledMessages() { return handledMessages; } public boolean isEnveloped() { - return envelope != null; + return isEnvelope; } public Class getHandlerInvocation(){ - return handlerConfig.invocation(); + return invocation; } public boolean handlesMessage(Class messageType) { @@ -119,8 +180,4 @@ public class MessageHandler { return acceptsSubtypes; } - - public boolean isEnabled() { - return handlerConfig.enabled(); - } } diff --git a/src/main/java/net/engio/mbassy/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/listener/MetadataReader.java index 2f7432a..dba0374 100644 --- a/src/main/java/net/engio/mbassy/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/listener/MetadataReader.java @@ -76,8 +76,9 @@ public class MetadataReader { } Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target); // if a handler is overwritten it inherits the configuration of its parent method - MessageHandler handlerMetadata = new MessageHandler(overriddenHandler == null ? handler : overriddenHandler, - getFilter(handlerConfig), handlerConfig, listenerMetadata); + Map handlerProperties = MessageHandler.Properties.Create(overriddenHandler == null ? handler : overriddenHandler, + handlerConfig, getFilter(handlerConfig), listenerMetadata); + MessageHandler handlerMetadata = new MessageHandler(handlerProperties); listenerMetadata.addHandler(handlerMetadata); } diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index 544d302..a123259 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -5,7 +5,6 @@ import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.dispatch.IMessageDispatcher; import java.util.Comparator; -import java.util.List; import java.util.UUID; /** @@ -64,7 +63,7 @@ public class Subscription { return context.getHandlerMetadata().handlesMessage(messageType); } - public List> getHandledMessageTypes(){ + public Class[] getHandledMessageTypes(){ return context.getHandlerMetadata().getHandledMessages(); } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java index b9c7806..458378b 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java @@ -1,8 +1,8 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.bus.BusRuntime; -import net.engio.mbassy.bus.RuntimeProvider; +import net.engio.mbassy.bus.common.RuntimeProvider; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.listener.MessageHandler; import java.util.Collection; diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java index 711d6d7..4cc0576 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java @@ -1,8 +1,8 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.IPublicationErrorHandler; -import net.engio.mbassy.MessageBusException; import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.MessageBusException; import net.engio.mbassy.common.StrongConcurrentSet; import net.engio.mbassy.common.WeakConcurrentSet; import net.engio.mbassy.dispatch.*; @@ -18,11 +18,9 @@ import java.util.Collection; */ public class SubscriptionFactory { - private static final String ErrorHandlers = "error.handlers"; - public Subscription createSubscription(BusRuntime runtime, MessageHandler handlerMetadata) throws MessageBusException{ try { - Collection errorHandlers = runtime.get(ErrorHandlers); + Collection errorHandlers = runtime.get(BusRuntime.Properties.ErrorHandlers); SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers); IHandlerInvocation invocation = buildInvocationForHandler(context); IMessageDispatcher dispatcher = buildDispatcher(context, invocation); diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 9d36099..0ab8b16 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -21,7 +21,8 @@ import org.junit.runners.Suite; MethodDispatchTest.class, DeadMessageTest.class, SynchronizedHandlerTest.class, - SubscriptionManagerTest.class + SubscriptionManagerTest.class, + AsyncFIFOBusTest.class }) public class AllTests { } diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java new file mode 100644 index 0000000..4ad56c7 --- /dev/null +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -0,0 +1,162 @@ +package net.engio.mbassy; + +import net.engio.mbassy.bus.BusFactory; +import net.engio.mbassy.bus.common.IMessageBus; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; + +/** + * + * @author bennidi + * Date: 3/30/14 + */ +public class AsyncFIFOBusTest extends MessageBusTest { + + @Test + public void testSingleThreadedSyncFIFO(){ + // create a fifo bus with 1000 concurrently subscribed listeners + IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); + + List listeners = new LinkedList(); + for(int i = 0; i < 1000 ; i++){ + SyncListener listener = new SyncListener(); + listeners.add(listener); + fifoBUs.subscribe(listener); + } + + // prepare set of messages in increasing order + int[] messages = new int[1000]; + for(int i = 0; i < messages.length ; i++){ + messages[i] = i; + } + // publish in ascending order + for(Integer message : messages) + fifoBUs.post(message).asynchronously(); + + while(fifoBUs.hasPendingMessages()) + pause(1000); + + for(SyncListener listener : listeners){ + assertEquals(messages.length, listener.receivedSync.size()); + for(int i=0; i < messages.length; i++){ + assertEquals(messages[i], listener.receivedSync.get(i)); + } + } + + } + + // NOTE: Can fail due to timing issues. + @Test + public void testSingleThreadedSyncAsyncFIFO(){ + // create a fifo bus with 1000 concurrently subscribed listeners + IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); + + List listeners = new LinkedList(); + for(int i = 0; i < 1000 ; i++){ + SyncAsyncListener listener = new SyncAsyncListener(); + listeners.add(listener); + fifoBUs.subscribe(listener); + } + + // prepare set of messages in increasing order + int[] messages = new int[1000]; + for(int i = 0; i < messages.length ; i++){ + messages[i] = i; + } + // publish in ascending order + for(Integer message : messages) + fifoBUs.post(message).asynchronously(); + + while(fifoBUs.hasPendingMessages()) + pause(2000); + + for(SyncAsyncListener listener : listeners){ + assertEquals(messages.length, listener.receivedSync.size()); + assertEquals(listener.receivedSync.size(), listener.receivedAsync.size()); + for(int i=0; i < listener.receivedAsync.size(); i++){ + assertEquals(messages[i], listener.receivedSync.get(i)); + // sync and async in same order + assertEquals(listener.receivedSync.get(i), listener.receivedAsync.get(i)); + } + } + + } + + /* + @Test + public void testMultiThreadedSyncFIFO(){ + // create a fifo bus with 1000 concurrently subscribed listeners + final IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); + + List listeners = new LinkedList(); + for(int i = 0; i < 1000 ; i++){ + SyncListener listener = new SyncListener(); + listeners.add(listener); + fifoBUs.subscribe(listener); + } + + // prepare set of messages in increasing order + final int[] messages = new int[10000]; + for(int i = 0; i < messages.length ; i++){ + messages[i] = i; + } + final AtomicInteger messageIndex = new AtomicInteger(0); + // publish in ascending order + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + int idx; + while((idx = messageIndex.getAndIncrement()) < messages.length){ + fifoBUs.post(messages[idx]).asynchronously(); + } + } + }, 5); + + while(fifoBUs.hasPendingMessages()) + pause(1000); + + for(SyncListener listener : listeners){ + assertEquals(messages.length, listener.receivedSync.size()); + for(int i=0; i < messages.length; i++){ + assertEquals(messages[i], listener.receivedSync.get(i)); + } + } + + } */ + + + + public static class SyncListener { + + private List receivedSync = new LinkedList(); + + @Handler + public void handleSync(Integer message){ + receivedSync.add(message); + } + + } + + public static class SyncAsyncListener { + + private List receivedSync = new LinkedList(); + private List receivedAsync = new LinkedList(); + + @Handler + public void handleSync(Integer message){ + receivedSync.add(message); + } + + @Handler(delivery = Invoke.Asynchronously) + public void handleASync(Integer message){ + receivedAsync.add(message); + } + + } + +} diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index fbabfaf..bab75c9 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -1,11 +1,14 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.common.*; +import net.engio.mbassy.bus.common.DeadMessage; +import net.engio.mbassy.bus.config.BusConfiguration; +import net.engio.mbassy.common.ConcurrentExecutor; +import net.engio.mbassy.common.ListenerFactory; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.common.TestUtil; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listeners.IMessageListener; -import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.listeners.MessagesListener; import net.engio.mbassy.listeners.ObjectListener; import org.junit.Before; diff --git a/src/test/java/net/engio/mbassy/FilterTest.java b/src/test/java/net/engio/mbassy/FilterTest.java index b113e1d..7f756f9 100644 --- a/src/test/java/net/engio/mbassy/FilterTest.java +++ b/src/test/java/net/engio/mbassy/FilterTest.java @@ -1,8 +1,12 @@ package net.engio.mbassy; import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.common.DeadMessage; +import net.engio.mbassy.bus.common.FilteredMessage; import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.common.*; +import net.engio.mbassy.common.ListenerFactory; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.common.TestUtil; import net.engio.mbassy.listener.*; import net.engio.mbassy.messages.SubTestMessage; import net.engio.mbassy.messages.TestMessage; diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java index 106a9ad..eb993c2 100644 --- a/src/test/java/net/engio/mbassy/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -1,7 +1,9 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.config.BusConfiguration; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.*; import net.engio.mbassy.listeners.*; import net.engio.mbassy.messages.MessageTypes; diff --git a/src/test/java/net/engio/mbassy/MethodDispatchTest.java b/src/test/java/net/engio/mbassy/MethodDispatchTest.java index dd4881f..5ef0587 100644 --- a/src/test/java/net/engio/mbassy/MethodDispatchTest.java +++ b/src/test/java/net/engio/mbassy/MethodDispatchTest.java @@ -1,6 +1,6 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.listener.Handler; diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java index ed6cc88..0412ea1 100644 --- a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -214,8 +214,8 @@ public class SubscriptionManagerTest extends AssertSupport { private BusRuntime mockedRuntime(){ return new BusRuntime(null) - .add("error.handlers", Collections.EMPTY_SET) - .add("handler.async-service", null); + .add(BusRuntime.Properties.ErrorHandlers, Collections.EMPTY_SET) + .add(BusRuntime.Properties.AsynchronousHandlerExecutor, null); } private ListenerFactory listeners(Class ...listeners){ diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index d8733be..2b2c818 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -1,10 +1,11 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.ISyncMessageBus; +import net.engio.mbassy.bus.BusFactory; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.SyncMessageBus; +import net.engio.mbassy.bus.common.ISyncMessageBus; import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.config.SyncBusConfiguration; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; @@ -17,7 +18,6 @@ import net.engio.mbassy.listeners.MessagesListener; import net.engio.mbassy.messages.MessageTypes; import net.engio.mbassy.messages.MultipartMessage; import net.engio.mbassy.messages.StandardMessage; - import org.junit.Assert; import org.junit.Test; @@ -188,7 +188,7 @@ public abstract class SyncBusTest extends MessageBusTest { @Override protected ISyncMessageBus getSyncMessageBus() { - return new SyncMessageBus(new SyncBusConfiguration()); + return BusFactory.SynchronousOnly(); } } diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index 56af6d3..1ef8841 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -1,8 +1,8 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.IMessageBus; import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.common.IMessageBus; +import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.Invoke; @@ -66,6 +66,7 @@ public class SynchronizedHandlerTest extends MessageBusTest { bus.post(new Object()).asynchronously(); } + // TODO: wait for publication pause(10000); for(SynchronizedWithAsynchronousDelivery handler : handlers){ diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index 747413e..518e8db 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -1,11 +1,11 @@ package net.engio.mbassy.common; import junit.framework.Assert; -import net.engio.mbassy.IPublicationErrorHandler; -import net.engio.mbassy.PublicationError; +import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.messages.MessageTypes; import org.junit.Before; diff --git a/src/test/java/net/engio/mbassy/common/TestUtil.java b/src/test/java/net/engio/mbassy/common/TestUtil.java index b89ee63..3976068 100644 --- a/src/test/java/net/engio/mbassy/common/TestUtil.java +++ b/src/test/java/net/engio/mbassy/common/TestUtil.java @@ -1,7 +1,7 @@ package net.engio.mbassy.common; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.PubSubSupport; +import net.engio.mbassy.bus.common.PubSubSupport; import net.engio.mbassy.subscription.SubscriptionManager; import java.util.Iterator;