From 8ed29f6b7f1c5d9f81785e55af206302a3e6bc6f Mon Sep 17 00:00:00 2001 From: nathan Date: Tue, 3 Feb 2015 22:32:50 +0100 Subject: [PATCH] Further simplification of mbassador --- .../{listener => annotations}/Handler.java | 2 +- .../{listener => annotations}/Listener.java | 2 +- .../Synchronized.java | 2 +- .../mbassy/bus/AbstractPubSubSupport.java | 51 +++++------- .../bus/AbstractSyncAsyncMessageBus.java | 40 ++++----- .../java/net/engio/mbassy/bus/BusRuntime.java | 59 ------------- .../java/net/engio/mbassy/bus/MBassador.java | 47 +++++++---- .../engio/mbassy/bus/MessagePublication.java | 13 +-- .../net/engio/mbassy/bus/SyncMessageBus.java | 39 +++++---- .../GenericMessagePublicationSupport.java | 25 ------ .../engio/mbassy/bus/common/IMessageBus.java | 13 +-- .../mbassy/bus/common/ISyncMessageBus.java | 12 --- .../mbassy/bus/common/PubSubSupport.java | 30 ++++++- .../mbassy/bus/common/RuntimeProvider.java | 11 --- .../mbassy/bus/config/BusConfiguration.java | 8 +- .../bus/config/ConfigurationErrorHandler.java | 10 --- .../net/engio/mbassy/bus/config/Feature.java | 44 +++------- .../mbassy/bus/config/IBusConfiguration.java | 5 -- .../bus/error/MissingPropertyException.java | 2 +- .../bus/publication/IPublicationCommand.java | 15 ---- .../ISyncAsyncPublicationCommand.java | 35 -------- .../bus/publication/SyncAsyncPostCommand.java | 38 --------- .../engio/mbassy/common/ReflectionUtils.java | 2 +- .../dispatch/DelegatingMessageDispatcher.java | 30 ------- .../mbassy/dispatch/HandlerInvocation.java | 30 ------- .../mbassy/dispatch/IHandlerInvocation.java | 9 +- .../mbassy/dispatch/IMessageDispatcher.java | 42 ---------- .../mbassy/dispatch/MessageDispatcher.java | 38 --------- .../dispatch/ReflectiveHandlerInvocation.java | 40 ++------- .../SynchronizedHandlerInvocation.java | 9 +- .../engio/mbassy/listener/MessageHandler.java | 2 + .../engio/mbassy/listener/MetadataReader.java | 1 + .../AbstractSubscriptionContextAware.java | 21 ----- .../ISubscriptionContextAware.java | 17 ---- .../ISubscriptionManagerProvider.java | 9 -- .../mbassy/subscription/Subscription.java | 82 +++++++++++++++---- .../subscription/SubscriptionContext.java | 59 ------------- .../subscription/SubscriptionFactory.java | 68 --------------- .../subscription/SubscriptionManager.java | 35 +++++--- .../SubscriptionManagerProvider.java | 12 --- .../net/engio/mbassy/AsyncFIFOBusTest.java | 50 +---------- .../mbassy/CustomHandlerAnnotationTest.java | 4 +- .../net/engio/mbassy/DeadMessageTest.java | 8 +- .../java/net/engio/mbassy/MBassadorTest.java | 10 +-- .../net/engio/mbassy/MetadataReaderTest.java | 2 +- .../net/engio/mbassy/MethodDispatchTest.java | 19 +++-- .../engio/mbassy/SubscriptionManagerTest.java | 17 ++-- .../java/net/engio/mbassy/SyncBusTest.java | 22 ++--- .../engio/mbassy/SynchronizedHandlerTest.java | 6 +- .../listeners/AbstractMessageListener.java | 2 +- .../listeners/ExceptionThrowingListener.java | 4 +- .../mbassy/listeners/ICountableListener.java | 2 +- .../mbassy/listeners/IMessageListener.java | 2 +- .../listeners/IMultipartMessageListener.java | 2 +- .../mbassy/listeners/MessagesListener.java | 2 +- .../listeners/MultipartMessageListener.java | 2 +- .../mbassy/listeners/ObjectListener.java | 2 +- .../engio/mbassy/listeners/Overloading.java | 4 +- .../listeners/StandardMessageListener.java | 2 +- 59 files changed, 315 insertions(+), 856 deletions(-) rename src/main/java/net/engio/mbassy/{listener => annotations}/Handler.java (93%) rename src/main/java/net/engio/mbassy/{listener => annotations}/Listener.java (94%) rename src/main/java/net/engio/mbassy/{listener => annotations}/Synchronized.java (95%) delete mode 100644 src/main/java/net/engio/mbassy/bus/BusRuntime.java delete mode 100644 src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java delete mode 100644 src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java delete mode 100644 src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java delete mode 100644 src/main/java/net/engio/mbassy/bus/config/ConfigurationErrorHandler.java delete mode 100644 src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java delete mode 100644 src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java delete mode 100644 src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java delete mode 100644 src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java delete mode 100644 src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java delete mode 100644 src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java delete mode 100644 src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/ISubscriptionContextAware.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java delete mode 100644 src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java diff --git a/src/main/java/net/engio/mbassy/listener/Handler.java b/src/main/java/net/engio/mbassy/annotations/Handler.java similarity index 93% rename from src/main/java/net/engio/mbassy/listener/Handler.java rename to src/main/java/net/engio/mbassy/annotations/Handler.java index a1e9980..dc3a948 100644 --- a/src/main/java/net/engio/mbassy/listener/Handler.java +++ b/src/main/java/net/engio/mbassy/annotations/Handler.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.listener; +package net.engio.mbassy.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; diff --git a/src/main/java/net/engio/mbassy/listener/Listener.java b/src/main/java/net/engio/mbassy/annotations/Listener.java similarity index 94% rename from src/main/java/net/engio/mbassy/listener/Listener.java rename to src/main/java/net/engio/mbassy/annotations/Listener.java index c6cbf90..b07aea1 100644 --- a/src/main/java/net/engio/mbassy/listener/Listener.java +++ b/src/main/java/net/engio/mbassy/annotations/Listener.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.listener; +package net.engio.mbassy.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; diff --git a/src/main/java/net/engio/mbassy/listener/Synchronized.java b/src/main/java/net/engio/mbassy/annotations/Synchronized.java similarity index 95% rename from src/main/java/net/engio/mbassy/listener/Synchronized.java rename to src/main/java/net/engio/mbassy/annotations/Synchronized.java index 800278d..747ab4e 100644 --- a/src/main/java/net/engio/mbassy/listener/Synchronized.java +++ b/src/main/java/net/engio/mbassy/annotations/Synchronized.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.listener; +package net.engio.mbassy.annotations; import java.lang.annotation.*; diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index a18febc..d4da00e 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -1,5 +1,10 @@ package net.engio.mbassy.bus; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import net.engio.mbassy.bus.common.DeadMessage; import net.engio.mbassy.bus.common.PubSubSupport; import net.engio.mbassy.bus.config.Feature; @@ -9,11 +14,6 @@ import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.SubscriptionManager; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - /** * The base class for all message bus implementations. * @@ -29,70 +29,63 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { private final SubscriptionManager subscriptionManager; - private final BusRuntime runtime; - public AbstractPubSubSupport(IBusConfiguration configuration) { - this.runtime = new BusRuntime(this); - this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers()); // configure the pub sub feature Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class); - this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider() - .createManager(pubSubFeature.getMetadataReader(), - pubSubFeature.getSubscriptionFactory(), runtime); + this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers()); this.publicationFactory = pubSubFeature.getPublicationFactory(); } protected MessagePublication.Factory getPublicationFactory() { - return publicationFactory; + return this.publicationFactory; } public Collection getRegisteredErrorHandlers() { - return Collections.unmodifiableCollection(errorHandlers); + return Collections.unmodifiableCollection(this.errorHandlers); } + @Override public boolean unsubscribe(Object listener) { - return subscriptionManager.unsubscribe(listener); + return this.subscriptionManager.unsubscribe(listener); } + @Override public void subscribe(Object listener) { - subscriptionManager.subscribe(listener); + this.subscriptionManager.subscribe(listener); } public final void addErrorHandler(IPublicationErrorHandler handler) { synchronized (this){ - errorHandlers.add(handler); + this.errorHandlers.add(handler); } } - @Override - public BusRuntime getRuntime() { - return runtime; - } - protected IMessagePublication createMessagePublication(T message) { - Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); - if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) { + Class class1 = message.getClass(); + Collection subscriptions = getSubscriptionsByMessageType(class1); + + if ((subscriptions == null || subscriptions.isEmpty()) && !class1.equals(DeadMessage.class)) { // Dead Event subscriptions = getSubscriptionsByMessageType(DeadMessage.class); - return getPublicationFactory().createPublication(runtime, subscriptions, new DeadMessage(message)); + return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message)); } else { - return getPublicationFactory().createPublication(runtime, subscriptions, message); + return getPublicationFactory().createPublication(this, subscriptions, message); } } // obtain the set of subscriptions for the given message type // Note: never returns null! - protected Collection getSubscriptionsByMessageType(Class messageType) { - return subscriptionManager.getSubscriptionsByMessageType(messageType); + protected Collection getSubscriptionsByMessageType(Class messageType) { + return this.subscriptionManager.getSubscriptionsByMessageType(messageType); } public void handlePublicationError(PublicationError error) { - for (IPublicationErrorHandler errorHandler : errorHandlers) { + for (IPublicationErrorHandler errorHandler : this.errorHandlers) { errorHandler.handleError(error); } } diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index 96e1d95..00a968b 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -1,11 +1,5 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.bus.common.IMessageBus; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; -import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -13,14 +7,19 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import net.engio.mbassy.bus.common.IMessageBus; +import net.engio.mbassy.bus.config.Feature; +import net.engio.mbassy.bus.config.IBusConfiguration; +import net.engio.mbassy.bus.error.PublicationError; + /** * The base class for all message bus implementations with support for asynchronous message dispatch * * @param * @param

*/ -public abstract class AbstractSyncAsyncMessageBus - extends AbstractPubSubSupport implements IMessageBus { +public abstract class AbstractSyncAsyncMessageBus + extends AbstractPubSubSupport implements IMessageBus { // executor for asynchronous message handlers private final ExecutorService executor; @@ -36,15 +35,13 @@ public abstract class AbstractSyncAsyncMessageBus(asyncDispatch.getNumberOfMessageDispatchers()); + this.pendingMessages = asyncDispatch.getPendingMessages(); + this.dispatchers = new ArrayList(asyncDispatch.getNumberOfMessageDispatchers()); initDispatcherThreads(asyncDispatch); // configure asynchronous handler invocation Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class); this.executor = asyncInvocation.getExecutor(); - getRuntime().add(BusRuntime.Properties.AsynchronousHandlerExecutor, executor); - } // initialize the dispatch workers @@ -53,11 +50,12 @@ public abstract class AbstractSyncAsyncMessageBus 0; + return this.pendingMessages.size() > 0; } @Override public Executor getExecutor() { - return executor; + return this.executor; } } diff --git a/src/main/java/net/engio/mbassy/bus/BusRuntime.java b/src/main/java/net/engio/mbassy/bus/BusRuntime.java deleted file mode 100644 index 047edb0..0000000 --- a/src/main/java/net/engio/mbassy/bus/BusRuntime.java +++ /dev/null @@ -1,59 +0,0 @@ -package net.engio.mbassy.bus; - -import net.engio.mbassy.bus.common.PubSubSupport; -import net.engio.mbassy.bus.error.MissingPropertyException; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -/** - * Message bus implementations potentially vary in the features they provide and consequently in the components and properties - * they expose. The runtime is a container for all those dynamic properties and components and is meant to be passed around - * between collaborating objects such that they may access the different functionality provided by the bus implementation - * they all belong to. - * - * It is the responsibility of the bus implementation to create and configure the runtime according to its capabilities, - * - */ -public class BusRuntime { - - public static class Properties{ - - public static final String ErrorHandlers = "error.handlers"; - public static final String AsynchronousHandlerExecutor = "handler.async.executor"; - - } - - private PubSubSupport provider; - - private Map properties = new HashMap(); - - public BusRuntime(PubSubSupport provider) { - this.provider = provider; - } - - public T get(String key){ - if(!contains(key)) - throw new MissingPropertyException("The property " + key + " is not available in this runtime"); - else return (T) properties.get(key); - } - - public PubSubSupport getProvider(){ - return provider; - } - - public Collection getKeys(){ - return properties.keySet(); - } - - public BusRuntime add(String key, Object property){ - properties.put(key, property); - return this; - } - - public boolean contains(String key){ - return properties.containsKey(key); - } - -} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index ec249f4..ee8868e 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -1,16 +1,15 @@ package net.engio.mbassy.bus; +import java.util.concurrent.TimeUnit; + import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.bus.publication.SyncAsyncPostCommand; - -import java.util.concurrent.TimeUnit; -public class MBassador extends AbstractSyncAsyncMessageBus> implements IMessageBus> { +public class MBassador extends AbstractSyncAsyncMessageBus implements IMessageBus { public MBassador(IBusConfiguration configuration) { super(configuration); @@ -23,22 +22,13 @@ public class MBassador extends AbstractSyncAsyncMessageBus extends AbstractSyncAsyncMessageBus post(T message) { - return new SyncAsyncPostCommand(this, message); + /** + * Execute the message publication asynchronously. The behaviour of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + * + * @return A message publication that can be used to access information about it's state + */ + public IMessagePublication publishAsync(T message) { + return addAsynchronousPublication(createMessagePublication(message)); } + + /** + * Execute the message publication asynchronously. The behaviour of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call will block until the message can be placed in the queue + * or the timeout is reached. + * + * @return A message publication that wraps up the publication request + */ + public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) { + return addAsynchronousPublication(createMessagePublication(message), timeout, unit); + } } diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java index 478bd9c..7cc3591 100644 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/MessagePublication.java @@ -3,6 +3,7 @@ package net.engio.mbassy.bus; import java.util.Collection; import net.engio.mbassy.bus.common.DeadMessage; +import net.engio.mbassy.bus.common.PubSubSupport; import net.engio.mbassy.subscription.Subscription; /** @@ -23,10 +24,10 @@ public class MessagePublication implements IMessagePublication { // message publications can be referenced by multiple threads to query publication progress private volatile State state = State.Initial; private volatile boolean delivered = false; - private final BusRuntime runtime; + private PubSubSupport pubSub; - protected MessagePublication(BusRuntime runtime, Collection subscriptions, Object message, State initialState) { - this.runtime = runtime; + protected MessagePublication(PubSubSupport pubSub, Collection subscriptions, Object message, State initialState) { + this.pubSub = pubSub; this.subscriptions = subscriptions; this.message = message; this.state = initialState; @@ -50,7 +51,7 @@ public class MessagePublication implements IMessagePublication { // if the message has not been marked delivered by the dispatcher if (!this.delivered) { if (!isDeadEvent()) { - this.runtime.getProvider().publish(new DeadMessage(this.message)); + this.pubSub.publish(new DeadMessage(this.message)); } } } @@ -100,8 +101,8 @@ public class MessagePublication implements IMessagePublication { public static class Factory { - public IMessagePublication createPublication(BusRuntime runtime, Collection subscriptions, Object message) { - return new MessagePublication(runtime, subscriptions, message, State.Initial); + public IMessagePublication createPublication(PubSubSupport pubSub, Collection subscriptions, Object message) { + return new MessagePublication(pubSub, subscriptions, message, State.Initial); } } diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index 5cd4892..d1f9139 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -1,18 +1,18 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.bus.common.ErrorHandlingSupport; -import net.engio.mbassy.bus.common.GenericMessagePublicationSupport; -import net.engio.mbassy.bus.common.PubSubSupport; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.bus.publication.IPublicationCommand; /** * A message bus implementation that offers only synchronous message publication. Using this bus * will not create any new threads. * */ -public class SyncMessageBus extends AbstractPubSubSupport implements PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport{ +public class SyncMessageBus extends AbstractPubSubSupport implements IMessageBus { public SyncMessageBus(IBusConfiguration configuration) { @@ -33,21 +33,28 @@ public class SyncMessageBus extends AbstractPubSubSupport implements PubSu } @Override - public SyncPostCommand post(T message) { - return new SyncPostCommand(message); + public IMessagePublication publishAsync(T message) { + publish(message); + return null; } - public class SyncPostCommand implements IPublicationCommand { + @Override + public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) { + publish(message); + return null; + } - private T message; + @Override + public Executor getExecutor() { + return null; + } - public SyncPostCommand(T message) { - this.message = message; - } + @Override + public boolean hasPendingMessages() { + return false; + } - @Override - public void now() { - publish(message); - } + @Override + public void shutdown() { } } diff --git a/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java b/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java deleted file mode 100644 index d55a8e1..0000000 --- a/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java +++ /dev/null @@ -1,25 +0,0 @@ -package net.engio.mbassy.bus.common; - -import net.engio.mbassy.bus.publication.IPublicationCommand; - -/** - * This interface is meant to be implemented by different bus implementations to offer a consistent way - * to plugin different flavors of message publication. - * - * The parametrization of the IPostCommand influences which publication flavours are available. - * - */ -public interface GenericMessagePublicationSupport extends PubSubSupport, ErrorHandlingSupport{ - - /** - * Publish a message to the bus using on of its supported message publication mechanisms. The supported - * mechanisms depend on the available implementation and are exposed as subclasses of IPublicationCommand. - * The standard mechanism is the synchronous dispatch which will publish the message in the current thread - * and returns after every matching handler has been invoked. @See IPublicationCommand. - * - * @param message - * @return - */ - P post(T message); - -} diff --git a/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java b/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java index 2e696fd..ec45c8c 100644 --- a/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/common/IMessageBus.java @@ -1,7 +1,5 @@ package net.engio.mbassy.bus.common; -import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand; - import java.util.concurrent.Executor; /** @@ -57,14 +55,7 @@ import java.util.concurrent.Executor; * @Author bennidi * Date: 2/8/12 */ -public interface IMessageBus - extends GenericMessagePublicationSupport{ - - /** - * {@inheritDoc} - */ - @Override - P post(T message); +public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport { /** * Get the executor service that is used for asynchronous message publications. @@ -89,6 +80,4 @@ public interface IMessageBus * to further use the message bus. */ void shutdown(); - - } diff --git a/src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java deleted file mode 100644 index 7879772..0000000 --- a/src/main/java/net/engio/mbassy/bus/common/ISyncMessageBus.java +++ /dev/null @@ -1,12 +0,0 @@ -package net.engio.mbassy.bus.common; - -import net.engio.mbassy.bus.publication.IPublicationCommand; - -/** - * @author bennidi - * Date: 3/29/13 - */ -public interface ISyncMessageBus extends PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport{ - - -} diff --git a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java index f0cebc1..4c85e7c 100644 --- a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java @@ -1,12 +1,16 @@ package net.engio.mbassy.bus.common; +import java.util.concurrent.TimeUnit; + +import net.engio.mbassy.bus.IMessagePublication; + /** * This interface defines the very basic message publication semantics according to the publish subscribe pattern. * Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its * handlers will be registered and start to receive matching message publications. * */ -public interface PubSubSupport extends RuntimeProvider{ +public interface PubSubSupport { /** * Subscribe all handlers of the given listener. Any listener is only subscribed once @@ -38,4 +42,28 @@ public interface PubSubSupport extends RuntimeProvider{ * @param message */ void publish(T message); + + + /** + * Execute the message publication asynchronously. The behaviour of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + * + * @return A message publication that can be used to access information about it's state + */ + IMessagePublication publishAsync(T message); + + /** + * Execute the message publication asynchronously. The behaviour of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call will block until the message can be placed in the queue + * or the timeout is reached. + * + * @return A message publication that wraps up the publication request + */ + IMessagePublication publishAsync(T message, long timeout, TimeUnit unit); } diff --git a/src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java b/src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java deleted file mode 100644 index 6ae2ed9..0000000 --- a/src/main/java/net/engio/mbassy/bus/common/RuntimeProvider.java +++ /dev/null @@ -1,11 +0,0 @@ -package net.engio.mbassy.bus.common; - -import net.engio.mbassy.bus.BusRuntime; - -/** - * Each message bus provides a runtime object to access its dynamic features and runtime configuration. - */ -public interface RuntimeProvider { - - BusRuntime getRuntime(); -} diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java index 561ad1f..17ce0e0 100644 --- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java @@ -17,17 +17,13 @@ public class BusConfiguration implements IBusConfiguration { @Override public T getFeature(Class feature) { - return (T)features.get(feature); + return (T)this.features.get(feature); } @Override public IBusConfiguration addFeature(Feature feature) { - features.put(feature.getClass(), feature); + this.features.put(feature.getClass(), feature); return this; } - @Override - public IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler) { - return null; // TODO: implement configuration validation - } } diff --git a/src/main/java/net/engio/mbassy/bus/config/ConfigurationErrorHandler.java b/src/main/java/net/engio/mbassy/bus/config/ConfigurationErrorHandler.java deleted file mode 100644 index 7554bdd..0000000 --- a/src/main/java/net/engio/mbassy/bus/config/ConfigurationErrorHandler.java +++ /dev/null @@ -1,10 +0,0 @@ -package net.engio.mbassy.bus.config; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 8/29/14 - */ -public interface ConfigurationErrorHandler { -} diff --git a/src/main/java/net/engio/mbassy/bus/config/Feature.java b/src/main/java/net/engio/mbassy/bus/config/Feature.java index 5d0cec5..76286b7 100644 --- a/src/main/java/net/engio/mbassy/bus/config/Feature.java +++ b/src/main/java/net/engio/mbassy/bus/config/Feature.java @@ -1,14 +1,17 @@ package net.engio.mbassy.bus.config; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; -import net.engio.mbassy.subscription.ISubscriptionManagerProvider; -import net.engio.mbassy.subscription.SubscriptionFactory; -import net.engio.mbassy.subscription.SubscriptionManagerProvider; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * A feature defines the configuration of a specific functionality of a message bus. @@ -19,38 +22,17 @@ import java.util.concurrent.atomic.AtomicInteger; public interface Feature { - class SyncPubSub implements Feature{ + class SyncPubSub implements Feature { public static final SyncPubSub Default(){ return new SyncPubSub() .setMetadataReader(new MetadataReader()) - .setPublicationFactory(new MessagePublication.Factory()) - .setSubscriptionFactory(new SubscriptionFactory()) - .setSubscriptionManagerProvider(new SubscriptionManagerProvider()); + .setPublicationFactory(new MessagePublication.Factory()); } private MessagePublication.Factory publicationFactory; private MetadataReader metadataReader; - private SubscriptionFactory subscriptionFactory; - private ISubscriptionManagerProvider subscriptionManagerProvider; - public ISubscriptionManagerProvider getSubscriptionManagerProvider() { - return subscriptionManagerProvider; - } - - public SyncPubSub setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { - this.subscriptionManagerProvider = subscriptionManagerProvider; - return this; - } - - public SubscriptionFactory getSubscriptionFactory() { - return subscriptionFactory; - } - - public SyncPubSub setSubscriptionFactory(SubscriptionFactory subscriptionFactory) { - this.subscriptionFactory = subscriptionFactory; - return this; - } public MetadataReader getMetadataReader() { return metadataReader; @@ -76,7 +58,7 @@ public interface Feature { } } - class AsynchronousHandlerInvocation implements Feature{ + class AsynchronousHandlerInvocation implements Feature { protected static final ThreadFactory MessageHandlerThreadFactory = new ThreadFactory() { @@ -114,7 +96,7 @@ public interface Feature { } } - class AsynchronousMessageDispatch implements Feature{ + class AsynchronousMessageDispatch implements Feature { protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() { diff --git a/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java index 51581c2..48248bc 100644 --- a/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java @@ -22,9 +22,4 @@ public interface IBusConfiguration{ T getFeature(Class feature); IBusConfiguration addFeature(Feature feature); - - IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler); - - - } diff --git a/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java b/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java index 08faf21..29e75ac 100644 --- a/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java +++ b/src/main/java/net/engio/mbassy/bus/error/MissingPropertyException.java @@ -6,7 +6,7 @@ package net.engio.mbassy.bus.error; * i.e. one component is trying to access a feature of another component that does not * provide the feature (e.g. asynchronous functionality within a synchronous bus) */ -public class MissingPropertyException extends RuntimeException{ +public class MissingPropertyException extends RuntimeException { public MissingPropertyException(String message) { super(message); diff --git a/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java b/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java deleted file mode 100644 index f26e695..0000000 --- a/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java +++ /dev/null @@ -1,15 +0,0 @@ -package net.engio.mbassy.bus.publication; - -/** - * A publication command is used as an intermediate object created by a call to the message bus' post method. - * It encapsulates the message publication flavors provided by the message bus implementation that created the command. - * Subclasses may extend this interface and add functionality, e.g. different dispatch schemes. - */ -public interface IPublicationCommand { - - /** - * Execute the message publication immediately. This call blocks until every matching message handler - * has been invoked. - */ - void now(); -} diff --git a/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java b/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java deleted file mode 100644 index d5ad5b3..0000000 --- a/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java +++ /dev/null @@ -1,35 +0,0 @@ -package net.engio.mbassy.bus.publication; - -import net.engio.mbassy.bus.IMessagePublication; - -import java.util.concurrent.TimeUnit; - -/** - * - * -*/ -public interface ISyncAsyncPublicationCommand extends IPublicationCommand { - - /** - * Execute the message publication asynchronously. The behaviour of this method depends on the - * configured queuing strategy: - *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call might block until the message can be placed in the queue. - * - * @return A message publication that can be used to access information about the state of - */ - IMessagePublication asynchronously(); - - /** - * Execute the message publication asynchronously. The behaviour of this method depends on the - * configured queuing strategy: - *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call will block until the message can be placed in the queue - * or the timeout is reached. - * - * @return A message publication that wraps up the publication request - */ - IMessagePublication asynchronously(long timeout, TimeUnit unit); -} diff --git a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java b/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java deleted file mode 100644 index 87601da..0000000 --- a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java +++ /dev/null @@ -1,38 +0,0 @@ -package net.engio.mbassy.bus.publication; - -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.IMessagePublication; - -import java.util.concurrent.TimeUnit; - -/** - * This post command provides access to standard synchronous and asynchronous dispatch - * - * @author bennidi - * Date: 11/12/12 - */ -public class SyncAsyncPostCommand implements ISyncAsyncPublicationCommand { - - private T message; - private MBassador mBassador; - - public SyncAsyncPostCommand(MBassador mBassador, T message) { - this.mBassador = mBassador; - this.message = message; - } - - @Override - public void now() { - mBassador.publish(message); - } - - @Override - public IMessagePublication asynchronously() { - return mBassador.publishAsync(message); - } - - @Override - public IMessagePublication asynchronously(long timeout, TimeUnit unit) { - return mBassador.publishAsync(message, timeout, unit); - } -} diff --git a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java index ea74076..945060d 100644 --- a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java @@ -8,7 +8,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; /** * @author bennidi diff --git a/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java deleted file mode 100644 index c05e865..0000000 --- a/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java +++ /dev/null @@ -1,30 +0,0 @@ -package net.engio.mbassy.dispatch; - -import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; - -/** - * A delegating dispatcher wraps additional logic around a given delegate. Essentially its - * an implementation of the decorator pattern. - * - * @author bennidi - * Date: 3/1/13 - */ -public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher { - - private final IMessageDispatcher delegate; - - - public DelegatingMessageDispatcher(IMessageDispatcher delegate) { - super(delegate.getContext()); - this.delegate = delegate; - } - - protected IMessageDispatcher getDelegate() { - return delegate; - } - - @Override - public IHandlerInvocation getInvocation() { - return delegate.getInvocation(); - } -} diff --git a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java deleted file mode 100644 index 8daf04a..0000000 --- a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java +++ /dev/null @@ -1,30 +0,0 @@ -package net.engio.mbassy.dispatch; - -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; -import net.engio.mbassy.subscription.SubscriptionContext; - -import java.util.Collection; - -/** - * This is the base class for handler invocations that already implements all context related methods only leaving the implementation of the actual invocation mechanism to the concrete subclass. - * - * @author bennidi - * Date: 3/29/13 - */ -public abstract class HandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation{ - - - private final Collection errorHandlers; - - public HandlerInvocation(SubscriptionContext context) { - super(context); - errorHandlers = context.getErrorHandlers(); - } - - protected void handlePublicationError(PublicationError error){ - for(IPublicationErrorHandler handler : errorHandlers) - handler.handleError(error); - } -} diff --git a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java index 58468bc..7ec1367 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.subscription.ISubscriptionContextAware; +import java.lang.reflect.Method; /** * A handler invocation encapsulates the logic that is used to invoke a single @@ -15,15 +15,16 @@ import net.engio.mbassy.subscription.ISubscriptionContextAware; * @author bennidi * Date: 11/23/12 */ -public interface IHandlerInvocation extends ISubscriptionContextAware { +public interface IHandlerInvocation { /** * Invoke the message delivery logic of this handler * - * @param handler The listener that will receive the message. This can be a reference to a method object + * @param listener The listener that will receive the message. This can be a reference to a method object * from the java reflection api or any other wrapper that can be used to invoke the handler * @param message The message to be delivered to the handler. This can be any object compatible with the object * type that the handler consumes + * @param handler The handler (method) that will be called via reflection */ - void invoke(HANDLER handler, MESSAGE message); + void invoke(Object listener, Object message, Method handler) throws Throwable; } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java deleted file mode 100644 index 23fcafe..0000000 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java +++ /dev/null @@ -1,42 +0,0 @@ -package net.engio.mbassy.dispatch; - -import net.engio.mbassy.bus.IMessagePublication; -import net.engio.mbassy.subscription.ISubscriptionContextAware; - -/** - * A message dispatcher provides the functionality to deliver a single message - * to a set of listeners. A message dispatcher uses a message context to access - * all information necessary for the message delivery. - *

- * The delivery of a single message to a single listener is responsibility of the - * handler invocation object associated with the dispatcher. - *

- * Implementations if IMessageDispatcher are partially designed using decorator pattern - * such that it is possible to compose different message dispatchers into dispatcher chains - * to achieve more complex dispatch logic. - * - * @author bennidi - * Date: 11/23/12 - */ -public interface IMessageDispatcher extends ISubscriptionContextAware { - - /** - * Delivers the given message to the given set of listeners. - * Delivery may be delayed, aborted or restricted in various ways, depending - * on the configuration of the dispatcher - * - * @param publication The message publication that initiated the dispatch - * @param message The message that should be delivered to the listeners - * @param listeners The listeners that should receive the message - */ - void dispatch(IMessagePublication publication, Object message, Iterable listeners); - - /** - * Get the handler invocation that will be used to deliver the - * message to each listener. - * - * @return the handler invocation that will be used to deliver the - * message to each listener - */ - IHandlerInvocation getInvocation(); -} diff --git a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java deleted file mode 100644 index 2aef9e9..0000000 --- a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java +++ /dev/null @@ -1,38 +0,0 @@ -package net.engio.mbassy.dispatch; - -import net.engio.mbassy.bus.IMessagePublication; -import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; -import net.engio.mbassy.subscription.SubscriptionContext; - -/** - * Standard implementation for direct, unfiltered message delivery. - *

- * For each message delivery, this dispatcher iterates over the listeners - * and uses the previously provided handler invocation to deliver the message - * to each listener - * - * @author bennidi - * Date: 11/23/12 - */ -public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher { - - private final IHandlerInvocation invocation; - - public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) { - super(context); - this.invocation = invocation; - } - - @Override - public void dispatch(final IMessagePublication publication, final Object message, final Iterable listeners){ - publication.markDelivered(); - for (Object listener : listeners) { - getInvocation().invoke(listener, message); - } - } - - @Override - public IHandlerInvocation getInvocation() { - return invocation; - } -} diff --git a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java index 796c3d6..f4e58d8 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -1,9 +1,5 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.error.PublicationError; -import net.engio.mbassy.subscription.SubscriptionContext; - -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; /** @@ -12,40 +8,14 @@ import java.lang.reflect.Method; * @author bennidi * Date: 11/23/12 */ -public class ReflectiveHandlerInvocation extends HandlerInvocation{ +public class ReflectiveHandlerInvocation implements IHandlerInvocation { - public ReflectiveHandlerInvocation(SubscriptionContext context) { - super(context); + public ReflectiveHandlerInvocation() { + super(); } - protected void invokeHandler(final Object message, final Object listener, Method handler){ - try { - handler.invoke(listener, message); - } catch (IllegalAccessException e) { - handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + - "The class or method is not accessible", - handler, listener, message)); - } catch (IllegalArgumentException e) { - handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0], - handler, listener, message)); - } catch (InvocationTargetException e) { - handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + - "Message handler threw exception", - handler, listener, message)); - } catch (Throwable e) { - handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + - "The handler code threw an exception", - handler, listener, message)); - } - } - - /** - * {@inheritDoc} - */ @Override - public void invoke(final Object listener, final Object message){ - invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler()); + public void invoke(final Object listener, final Object message, Method handler) throws Throwable { + handler.invoke(listener, message); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java index 9213f91..269a0df 100644 --- a/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; +import java.lang.reflect.Method; /** * Synchronizes message handler invocations for all handlers that specify @Synchronized @@ -8,12 +8,11 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; * @author bennidi * Date: 3/31/13 */ -public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { +public class SynchronizedHandlerInvocation implements IHandlerInvocation { private IHandlerInvocation delegate; public SynchronizedHandlerInvocation(IHandlerInvocation delegate) { - super(delegate.getContext()); this.delegate = delegate; } @@ -21,9 +20,9 @@ public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAw * {@inheritDoc} */ @Override - public void invoke(final Object listener, final Object message){ + public void invoke(final Object listener, final Object message, Method handler) throws Throwable { synchronized (listener){ - delegate.invoke(listener, message); + this.delegate.invoke(listener, message, handler); } } diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/listener/MessageHandler.java index b340793..ea58b99 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandler.java @@ -3,6 +3,8 @@ package net.engio.mbassy.listener; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.annotations.Synchronized; import net.engio.mbassy.common.ReflectionUtils; /** diff --git a/src/main/java/net/engio/mbassy/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/listener/MetadataReader.java index 9b0309b..d76eb22 100644 --- a/src/main/java/net/engio/mbassy/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/listener/MetadataReader.java @@ -4,6 +4,7 @@ import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.common.ReflectionUtils; /** diff --git a/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java b/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java deleted file mode 100644 index 39004f6..0000000 --- a/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java +++ /dev/null @@ -1,21 +0,0 @@ -package net.engio.mbassy.subscription; - -/** - * The base implementation for subscription context aware objects (mightily obvious :) - * - * @author bennidi - * Date: 3/1/13 - */ -public class AbstractSubscriptionContextAware implements ISubscriptionContextAware { - - private final SubscriptionContext context; - - public AbstractSubscriptionContextAware(SubscriptionContext context) { - this.context = context; - } - - public SubscriptionContext getContext() { - return context; - } - -} diff --git a/src/main/java/net/engio/mbassy/subscription/ISubscriptionContextAware.java b/src/main/java/net/engio/mbassy/subscription/ISubscriptionContextAware.java deleted file mode 100644 index d3ca351..0000000 --- a/src/main/java/net/engio/mbassy/subscription/ISubscriptionContextAware.java +++ /dev/null @@ -1,17 +0,0 @@ -package net.engio.mbassy.subscription; - -/** - * This interface marks components that have access to the subscription context. - * - * @author bennidi - * Date: 3/1/13 - */ -public interface ISubscriptionContextAware{ - - /** - * Get the subscription context associated with this object - * - * @return the subscription context associated with this object - */ - SubscriptionContext getContext(); -} diff --git a/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java deleted file mode 100644 index 4094bab..0000000 --- a/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java +++ /dev/null @@ -1,9 +0,0 @@ -package net.engio.mbassy.subscription; - -import net.engio.mbassy.bus.BusRuntime; -import net.engio.mbassy.listener.MetadataReader; - -public interface ISubscriptionManagerProvider { - SubscriptionManager createManager(MetadataReader reader, - SubscriptionFactory factory, BusRuntime runtime); -} diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index 7cf77c4..e91c0e8 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -1,12 +1,19 @@ package net.engio.mbassy.subscription; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; + import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.IConcurrentSet; -import net.engio.mbassy.dispatch.IMessageDispatcher; +import net.engio.mbassy.dispatch.IHandlerInvocation; +import net.engio.mbassy.listener.MessageHandler; /** * A subscription is a thread-safe container that manages exactly one message handler of all registered - * message listeners of the same class, i.e. all subscribed instances (exlcuding subclasses) of a SingleMessageHandler.class + * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class * will be referenced in the subscription created for SingleMessageHandler.class. * * There will be as many unique subscription objects per message listener class as there are message handlers @@ -18,15 +25,22 @@ import net.engio.mbassy.dispatch.IMessageDispatcher; */ public class Subscription { + // the handler's metadata -> for each handler in a listener, a unique subscription context is created + private final MessageHandler handlerMetadata; + + // error handling is first-class functionality + private final Collection errorHandlers; + + private final IHandlerInvocation invocation; + protected final IConcurrentSet listeners; - private final IMessageDispatcher dispatcher; + Subscription(MessageHandler handler, Collection errorHandlers, + IHandlerInvocation invocation, IConcurrentSet listeners) { - private final SubscriptionContext context; - - Subscription(SubscriptionContext context, IMessageDispatcher dispatcher, IConcurrentSet listeners) { - this.context = context; - this.dispatcher = dispatcher; + this.handlerMetadata = handler; + this.errorHandlers = errorHandlers; + this.invocation = invocation; this.listeners = listeners; } @@ -37,7 +51,7 @@ public class Subscription { * @return */ public boolean belongsTo(Class listener){ - return this.context.getHandlerMetadata().isFromListener(listener); + return this.handlerMetadata.isFromListener(listener); } /** @@ -55,17 +69,57 @@ public class Subscription { * @return */ public boolean handlesMessageType(Class messageType) { - return this.context.getHandlerMetadata().handlesMessage(messageType); + return this.handlerMetadata.handlesMessage(messageType); } public Class[] getHandledMessageTypes(){ - return this.context.getHandlerMetadata().getHandledMessages(); + return this.handlerMetadata.getHandledMessages(); + } + + public void publish(IMessagePublication publication, Object message){ + if (this.listeners.size() > 0) { + publication.markDelivered(); + + /** + * Delivers the given message to the given set of listeners. + * Delivery may be delayed, aborted or restricted in various ways, depending + * on the configuration of the dispatcher + * + * @param publication The message publication that initiated the dispatch + * @param message The message that should be delivered to the listeners + * @param listeners The listeners that should receive the message + */ + Method handler = this.handlerMetadata.getHandler(); + + for (Object listener : this.listeners) { + try { + this.invocation.invoke(listener, message, handler); + } catch (IllegalAccessException e) { + handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + + "The class or method is not accessible", + handler, listener, message)); + } catch (IllegalArgumentException e) { + handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0], + handler, listener, message)); + } catch (InvocationTargetException e) { + handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + + "Message handler threw exception", + handler, listener, message)); + } catch (Throwable e) { + handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + + "The handler code threw an exception", + handler, listener, message)); + } + } + } } - public void publish(IMessagePublication publication, Object message){ - if(this.listeners.size() > 0) { - this.dispatcher.dispatch(publication, message, this.listeners); + private final void handlePublicationError(PublicationError error) { + for (IPublicationErrorHandler handler : this.errorHandlers) { + handler.handleError(error); } } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java deleted file mode 100644 index 458378b..0000000 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java +++ /dev/null @@ -1,59 +0,0 @@ -package net.engio.mbassy.subscription; - -import net.engio.mbassy.bus.BusRuntime; -import net.engio.mbassy.bus.common.RuntimeProvider; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.listener.MessageHandler; - -import java.util.Collection; - -/** - * The subscription context holds all (meta)data/objects that are relevant to successfully publish - * a message within a subscription. A one-to-one relation between a subscription and - * subscription context holds -> a subscription context is created for each distinct subscription - * managed by the subscription manager. - * - * @author bennidi - * Date: 11/23/12 - */ -public class SubscriptionContext implements RuntimeProvider { - - // the handler's metadata -> for each handler in a listener, a unique subscription context is created - private final MessageHandler handlerMetadata; - - // error handling is first-class functionality - private final Collection errorHandlers; - - private BusRuntime runtime; - - public SubscriptionContext(BusRuntime runtime, MessageHandler handlerMetadata, - Collection errorHandlers) { - this.runtime = runtime; - this.handlerMetadata = handlerMetadata; - this.errorHandlers = errorHandlers; - } - - /** - * Get the meta data that specifies the characteristics of the message handler - * that is associated with this context - * - * @return - */ - public MessageHandler getHandlerMetadata() { - return handlerMetadata; - } - - /** - * Get the error handlers registered with the enclosing bus. - * @return - */ - public Collection getErrorHandlers(){ - return errorHandlers; - } - - @Override - public BusRuntime getRuntime() { - return runtime; - } - -} diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java deleted file mode 100644 index 21ed66d..0000000 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -package net.engio.mbassy.subscription; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Modifier; -import java.util.Collection; - -import net.engio.mbassy.bus.BusRuntime; -import net.engio.mbassy.bus.error.IPublicationErrorHandler; -import net.engio.mbassy.bus.error.MessageBusException; -import net.engio.mbassy.common.WeakConcurrentSet; -import net.engio.mbassy.dispatch.HandlerInvocation; -import net.engio.mbassy.dispatch.IHandlerInvocation; -import net.engio.mbassy.dispatch.IMessageDispatcher; -import net.engio.mbassy.dispatch.MessageDispatcher; -import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; -import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation; -import net.engio.mbassy.listener.MessageHandler; - -/** - * The subscription factory is used to create an empty subscription for specific message handler. - * The message handler's configuration is evaluated and a corresponding subscription is built. - */ -public class SubscriptionFactory { - - public Subscription createSubscription(BusRuntime runtime, MessageHandler handlerMetadata) throws MessageBusException{ - try { - Collection errorHandlers = runtime.get(BusRuntime.Properties.ErrorHandlers); - SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers); - IHandlerInvocation invocation = buildInvocationForHandler(context); - IMessageDispatcher dispatcher = buildDispatcher(context, invocation); - return new Subscription(context, dispatcher, new WeakConcurrentSet()); - } catch (Exception e) { - throw new MessageBusException(e); - } - } - - protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context) throws Exception { - IHandlerInvocation invocation = createBaseHandlerInvocation(context); - if(context.getHandlerMetadata().isSynchronized()){ - invocation = new SynchronizedHandlerInvocation(invocation); - } - - return invocation; - } - - protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation) { - IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation); - - return dispatcher; - } - - protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws MessageBusException { - Class invocation = ReflectiveHandlerInvocation.class; - if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){ - throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class"); - } - try { - Constructor constructor = invocation.getConstructor(SubscriptionContext.class); - return constructor.newInstance(context); - } catch (NoSuchMethodException e) { - throw new MessageBusException("The provided handler invocation did not specify the necessary constructor " - + invocation.getSimpleName() + "(SubscriptionContext);", e); - } catch (Exception e) { - throw new MessageBusException("Could not instantiate the provided handler invocation " - + invocation.getSimpleName(), e); - } - } -} diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java index 2201d0c..4d04035 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java @@ -9,8 +9,13 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; +import net.engio.mbassy.bus.error.MessageBusException; import net.engio.mbassy.common.ReflectionUtils; +import net.engio.mbassy.common.WeakConcurrentSet; +import net.engio.mbassy.dispatch.IHandlerInvocation; +import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; +import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.listener.MessageHandler; import net.engio.mbassy.listener.MetadataReader; @@ -25,8 +30,6 @@ import net.engio.mbassy.listener.MetadataReader; */ public class SubscriptionManager { - private static final Object setObject = new Object(); - // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader; @@ -46,19 +49,15 @@ public class SubscriptionManager { // remember already processed classes that do not contain any message handlers private final ConcurrentHashMap, Object> nonListeners = new ConcurrentHashMap, Object>(); - // this factory is used to create specialized subscriptions based on the given message handler configuration - // it can be customized by implementing the getSubscriptionFactory() method - private final SubscriptionFactory subscriptionFactory; - // synchronize read/write acces to the subscription maps private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final BusRuntime runtime; + // error handling is first-class functionality + private final Collection errorHandlers; - public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory, BusRuntime runtime) { + public SubscriptionManager(MetadataReader metadataReader, Collection errorHandlers) { this.metadataReader = metadataReader; - this.subscriptionFactory = subscriptionFactory; - this.runtime = runtime; + this.errorHandlers = errorHandlers; } @@ -110,7 +109,19 @@ public class SubscriptionManager { // create subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { // create the subscription - subscriptionsByListener.add(this.subscriptionFactory.createSubscription(this.runtime, messageHandler)); + + try { + IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); + + if (messageHandler.isSynchronized()){ + invocation = new SynchronizedHandlerInvocation(invocation); + } + + Subscription subscription = new Subscription(messageHandler, this.errorHandlers, invocation, new WeakConcurrentSet()); + subscriptionsByListener.add(subscription); + } catch (Exception e) { + throw new MessageBusException(e); + } } // this will acquire a write lock and handle the case when another thread already subscribed diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java deleted file mode 100644 index fd6f319..0000000 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java +++ /dev/null @@ -1,12 +0,0 @@ -package net.engio.mbassy.subscription; - -import net.engio.mbassy.bus.BusRuntime; -import net.engio.mbassy.listener.MetadataReader; - -public class SubscriptionManagerProvider implements ISubscriptionManagerProvider { - @Override - public SubscriptionManager createManager(MetadataReader reader, - SubscriptionFactory factory, BusRuntime runtime) { - return new SubscriptionManager(reader, factory, runtime); - } -} diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java index c456375..eb6feee 100644 --- a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -3,10 +3,10 @@ package net.engio.mbassy; import java.util.LinkedList; import java.util.List; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.bus.BusFactory; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.listener.Handler; import org.junit.Test; @@ -36,7 +36,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { } // publish in ascending order for(Integer message : messages) { - fifoBUs.post(message).asynchronously(); + fifoBUs.publishAsync(message); } while(fifoBUs.hasPendingMessages()) { @@ -72,7 +72,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { } // publish in ascending order for(Integer message : messages) { - fifoBUs.post(message).asynchronously(); + fifoBUs.publishAsync(message); } while(fifoBUs.hasPendingMessages()) { @@ -88,50 +88,6 @@ public class AsyncFIFOBusTest extends MessageBusTest { } - /* - @Test - public void testMultiThreadedSyncFIFO(){ - // create a fifo bus with 1000 concurrently subscribed listeners - final IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); - - List listeners = new LinkedList(); - for(int i = 0; i < 1000 ; i++){ - SyncListener listener = new SyncListener(); - listeners.add(listener); - fifoBUs.subscribe(listener); - } - - // prepare set of messages in increasing order - final int[] messages = new int[10000]; - for(int i = 0; i < messages.length ; i++){ - messages[i] = i; - } - final AtomicInteger messageIndex = new AtomicInteger(0); - // publish in ascending order - ConcurrentExecutor.runConcurrent(new Runnable() { - @Override - public void run() { - int idx; - while((idx = messageIndex.getAndIncrement()) < messages.length){ - fifoBUs.post(messages[idx]).asynchronously(); - } - } - }, 5); - - while(fifoBUs.hasPendingMessages()) - pause(1000); - - for(SyncListener listener : listeners){ - assertEquals(messages.length, listener.receivedSync.size()); - for(int i=0; i < messages.length; i++){ - assertEquals(messages[i], listener.receivedSync.get(i)); - } - } - - } */ - - - public static class SyncListener { private List receivedSync = new LinkedList(); diff --git a/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java b/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java index 00816f2..1a40cf3 100644 --- a/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java +++ b/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java @@ -8,10 +8,10 @@ import java.lang.annotation.Target; import java.util.HashSet; import java.util.Set; +import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.annotations.Synchronized; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Synchronized; import org.junit.Test; diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index 6431a84..65e8dd7 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -2,13 +2,13 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.common.DeadMessage; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listeners.IMessageListener; import net.engio.mbassy.listeners.MessagesListener; import net.engio.mbassy.listeners.ObjectListener; @@ -78,7 +78,7 @@ public class DeadMessageTest extends MessageBusTest{ ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits); // Only dead message handlers available - bus.post(new Object()).now(); + bus.publish(new Object()); // The message should be caught as dead message since there are no subscribed listeners assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); @@ -88,7 +88,7 @@ public class DeadMessageTest extends MessageBusTest{ // Add object listeners and publish again ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits); - bus.post(new Object()).now(); + bus.publish(new Object()); // verify that no dead message events were produced assertEquals(0, DeadMessagHandler.deadMessages.get()); @@ -97,7 +97,7 @@ public class DeadMessageTest extends MessageBusTest{ ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits); // Only dead message handlers available - bus.post(new Object()).now(); + bus.publish(new Object()); // The message should be caught, as it's the only listener assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java index e1e221b..cc36b87 100644 --- a/src/test/java/net/engio/mbassy/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -44,9 +44,9 @@ public class MBassadorTest extends MessageBusTest { StandardMessage standardMessage = new StandardMessage(); MultipartMessage multipartMessage = new MultipartMessage(); - bus.post(standardMessage).now(); - bus.post(multipartMessage).now(); - bus.post(MessageTypes.Simple).now(); + bus.publish(standardMessage); + bus.publish(multipartMessage); + bus.publish(MessageTypes.Simple); assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class)); assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class)); @@ -78,7 +78,7 @@ public class MBassadorTest extends MessageBusTest { @Override public void run() { - bus.post(MessageTypes.Simple).asynchronously(); + bus.publishAsync(MessageTypes.Simple); } }; @@ -112,7 +112,7 @@ public class MBassadorTest extends MessageBusTest { Runnable publishAndCheck = new Runnable() { @Override public void run() { - bus.post(new StandardMessage()).asynchronously(); + bus.publishAsync(new StandardMessage()); } }; diff --git a/src/test/java/net/engio/mbassy/MetadataReaderTest.java b/src/test/java/net/engio/mbassy/MetadataReaderTest.java index bca2559..2750bf7 100644 --- a/src/test/java/net/engio/mbassy/MetadataReaderTest.java +++ b/src/test/java/net/engio/mbassy/MetadataReaderTest.java @@ -4,8 +4,8 @@ import java.io.BufferedReader; import java.util.HashMap; import java.util.Map; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.common.AssertSupport; -import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.MessageListener; import net.engio.mbassy.listener.MetadataReader; diff --git a/src/test/java/net/engio/mbassy/MethodDispatchTest.java b/src/test/java/net/engio/mbassy/MethodDispatchTest.java index b68a7df..5f50803 100644 --- a/src/test/java/net/engio/mbassy/MethodDispatchTest.java +++ b/src/test/java/net/engio/mbassy/MethodDispatchTest.java @@ -1,9 +1,9 @@ package net.engio.mbassy; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.bus.common.IMessageBus; -import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.listener.Handler; + import org.junit.Test; /** @@ -24,7 +24,7 @@ public class MethodDispatchTest extends MessageBusTest{ @Handler public void handleString(String s) { - listener1Called = true; + MethodDispatchTest.this.listener1Called = true; } } @@ -33,8 +33,9 @@ public class MethodDispatchTest extends MessageBusTest{ public class EventListener2 extends EventListener1 { // redefine handler implementation (not configuration) + @Override public void handleString(String s) { - listener2Called = true; + MethodDispatchTest.this.listener2Called = true; } } @@ -44,14 +45,14 @@ public class MethodDispatchTest extends MessageBusTest{ IMessageBus bus = createBus(SyncAsync()); EventListener2 listener2 = new EventListener2(); bus.subscribe(listener2); - bus.post("jfndf").now(); - assertTrue(listener2Called); - assertFalse(listener1Called); + bus.publish("jfndf"); + assertTrue(this.listener2Called); + assertFalse(this.listener1Called); EventListener1 listener1 = new EventListener1(); bus.subscribe(listener1); - bus.post("jfndf").now(); - assertTrue(listener1Called); + bus.publish("jfndf"); + assertTrue(this.listener1Called); } } diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java index 8e74408..f828acf 100644 --- a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -2,7 +2,7 @@ package net.engio.mbassy; import java.util.Collections; -import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.common.AssertSupport; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; @@ -24,7 +24,6 @@ import net.engio.mbassy.messages.IMultipartMessage; import net.engio.mbassy.messages.MessageTypes; import net.engio.mbassy.messages.MultipartMessage; import net.engio.mbassy.messages.StandardMessage; -import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionManager; import org.junit.Test; @@ -167,7 +166,7 @@ public class SubscriptionManagerTest extends AssertSupport { Overloading.ListenerBase.class, Overloading.ListenerSub.class); - SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime()); + SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.emptyList()); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) @@ -177,22 +176,16 @@ public class SubscriptionManagerTest extends AssertSupport { runTestWith(listeners, expectedSubscriptions); } - private BusRuntime mockedRuntime(){ - return new BusRuntime(null) - .add(BusRuntime.Properties.ErrorHandlers, Collections.EMPTY_SET) - .add(BusRuntime.Properties.AsynchronousHandlerExecutor, null); - } - - private ListenerFactory listeners(Class ...listeners){ + private ListenerFactory listeners(Class ...listeners){ ListenerFactory factory = new ListenerFactory(); - for(Class listener : listeners){ + for (Class listener : listeners){ factory.create(InstancesPerListener, listener); } return factory; } private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){ - final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime()); + final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.emptyList()); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index dfc92a4..1069aa1 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.bus.BusFactory; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.common.GenericMessagePublicationSupport; +import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; @@ -30,12 +30,12 @@ import org.junit.Test; public abstract class SyncBusTest extends MessageBusTest { - protected abstract GenericMessagePublicationSupport getSyncMessageBus(); + protected abstract IMessageBus getSyncMessageBus(); @Test public void testSynchronousMessagePublication() throws Exception { - final GenericMessagePublicationSupport bus = getSyncMessageBus(); + final IMessageBus bus = getSyncMessageBus(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class) @@ -52,10 +52,10 @@ public abstract class SyncBusTest extends MessageBusTest { StandardMessage standardMessage = new StandardMessage(); MultipartMessage multipartMessage = new MultipartMessage(); - bus.post(standardMessage).now(); - bus.post(multipartMessage).now(); - bus.post(MessageTypes.Simple).now(); - bus.post(MessageTypes.Multipart).now(); + bus.publish(standardMessage); + bus.publish(multipartMessage); + bus.publish(MessageTypes.Simple); + bus.publish(MessageTypes.Multipart); assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class)); assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class)); @@ -86,7 +86,7 @@ public abstract class SyncBusTest extends MessageBusTest { } }; - final GenericMessagePublicationSupport bus = getSyncMessageBus(); + final IMessageBus bus = getSyncMessageBus(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); @@ -96,7 +96,7 @@ public abstract class SyncBusTest extends MessageBusTest { Runnable publish = new Runnable() { @Override public void run() { - bus.post(new StandardMessage()).now(); + bus.publish(new StandardMessage()); } }; @@ -115,7 +115,7 @@ public abstract class SyncBusTest extends MessageBusTest { @Override - protected GenericMessagePublicationSupport getSyncMessageBus() { + protected IMessageBus getSyncMessageBus() { return new MBassador(); } @@ -125,7 +125,7 @@ public abstract class SyncBusTest extends MessageBusTest { @Override - protected GenericMessagePublicationSupport getSyncMessageBus() { + protected IMessageBus getSyncMessageBus() { return BusFactory.SynchronousOnly(); } } diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index dc11969..1e15302 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -3,13 +3,13 @@ package net.engio.mbassy; import java.util.LinkedList; import java.util.List; +import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.annotations.Synchronized; import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Synchronized; import org.junit.Test; @@ -41,7 +41,7 @@ public class SynchronizedHandlerTest extends MessageBusTest { IMessagePublication publication = null; for(int i = 0; i < numberOfMessages; i++){ - publication = bus.post(new Object()).asynchronously(); + publication = bus.publishAsync(new Object()); } // wait for last publication while (!publication.isFinished()){ diff --git a/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java index ebe59b3..c2be80c 100644 --- a/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.AbstractMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java b/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java index 055a185..c0b381f 100644 --- a/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java +++ b/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java @@ -1,7 +1,7 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.annotations.Listener; import net.engio.mbassy.messages.StandardMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/ICountableListener.java b/src/test/java/net/engio/mbassy/listeners/ICountableListener.java index fe19756..4bd5592 100644 --- a/src/test/java/net/engio/mbassy/listeners/ICountableListener.java +++ b/src/test/java/net/engio/mbassy/listeners/ICountableListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.ICountable; /** diff --git a/src/test/java/net/engio/mbassy/listeners/IMessageListener.java b/src/test/java/net/engio/mbassy/listeners/IMessageListener.java index b499509..58fdcfa 100644 --- a/src/test/java/net/engio/mbassy/listeners/IMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/IMessageListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.IMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java index ffc9f2f..75da558 100644 --- a/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.IMultipartMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/MessagesListener.java b/src/test/java/net/engio/mbassy/listeners/MessagesListener.java index 6864bba..9631c5f 100644 --- a/src/test/java/net/engio/mbassy/listeners/MessagesListener.java +++ b/src/test/java/net/engio/mbassy/listeners/MessagesListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.MessageTypes; /** diff --git a/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java b/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java index ddb3bc3..a054237 100644 --- a/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.MultipartMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/ObjectListener.java b/src/test/java/net/engio/mbassy/listeners/ObjectListener.java index 387da15..9e9e1f4 100644 --- a/src/test/java/net/engio/mbassy/listeners/ObjectListener.java +++ b/src/test/java/net/engio/mbassy/listeners/ObjectListener.java @@ -4,7 +4,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; public class ObjectListener { diff --git a/src/test/java/net/engio/mbassy/listeners/Overloading.java b/src/test/java/net/engio/mbassy/listeners/Overloading.java index f3de29a..bc3a8d9 100644 --- a/src/test/java/net/engio/mbassy/listeners/Overloading.java +++ b/src/test/java/net/engio/mbassy/listeners/Overloading.java @@ -1,7 +1,7 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.annotations.Listener; import net.engio.mbassy.messages.AbstractMessage; /** diff --git a/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java index 620d032..9cda22d 100644 --- a/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.messages.StandardMessage; /**