From ff4dd6271f6d9c55b1b5dea17d954a25806369df Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 4 Feb 2015 02:33:47 +0100 Subject: [PATCH] Deleted configuration. Changed to single parameter for thread count in constructor --- .../java/net/engio/mbassy/BusFactory.java | 44 ----- .../java/net/engio/mbassy/IMessageBus.java | 12 -- src/main/java/net/engio/mbassy/MBassador.java | 155 ++++++++++++++++++ .../mbassy/bus/AbstractPubSubSupport.java | 8 +- .../bus/AbstractSyncAsyncMessageBus.java | 117 ------------- .../java/net/engio/mbassy/bus/MBassador.java | 73 --------- .../net/engio/mbassy/bus/SyncMessageBus.java | 57 ------- .../mbassy/bus/config/BusConfiguration.java | 29 ---- .../mbassy/bus/config/ConfigurationError.java | 10 -- .../net/engio/mbassy/bus/config/Feature.java | 126 -------------- .../mbassy/bus/config/IBusConfiguration.java | 25 --- .../bus/error/IPublicationErrorHandler.java | 1 - .../mbassy/bus/error/MessageBusException.java | 1 + .../engio/mbassy/listener/MessageHandler.java | 1 - .../subscription/SubscriptionManager.java | 5 +- src/test/java/net/engio/mbassy/AllTests.java | 1 - .../net/engio/mbassy/AsyncFIFOBusTest.java | 51 +++--- .../mbassy/CustomHandlerAnnotationTest.java | 3 +- .../net/engio/mbassy/DeadMessageTest.java | 5 +- .../java/net/engio/mbassy/MBassadorTest.java | 7 +- .../net/engio/mbassy/MethodDispatchTest.java | 2 +- .../engio/mbassy/SubscriptionManagerTest.java | 5 +- .../java/net/engio/mbassy/SyncBusTest.java | 12 -- .../engio/mbassy/SynchronizedHandlerTest.java | 6 +- .../engio/mbassy/common/MessageBusTest.java | 19 +-- .../net/engio/mbassy/common/TestUtil.java | 2 +- 26 files changed, 199 insertions(+), 578 deletions(-) delete mode 100644 src/main/java/net/engio/mbassy/BusFactory.java create mode 100644 src/main/java/net/engio/mbassy/MBassador.java delete mode 100644 src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java delete mode 100644 src/main/java/net/engio/mbassy/bus/MBassador.java delete mode 100644 src/main/java/net/engio/mbassy/bus/SyncMessageBus.java delete mode 100644 src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java delete mode 100644 src/main/java/net/engio/mbassy/bus/config/ConfigurationError.java delete mode 100644 src/main/java/net/engio/mbassy/bus/config/Feature.java delete mode 100644 src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java diff --git a/src/main/java/net/engio/mbassy/BusFactory.java b/src/main/java/net/engio/mbassy/BusFactory.java deleted file mode 100644 index c5ceca9..0000000 --- a/src/main/java/net/engio/mbassy/BusFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -package net.engio.mbassy; - -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.SyncMessageBus; -import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.config.Feature; - -/** - * The bus factory provides convenient factory methods for the most common bus use cases. - * - * @author bennidi - * Date: 3/30/14 - */ -public class BusFactory { - - /** - * Create a message bus supporting only synchronous message publication. - * All message publications will run in the calling thread, no bus internal - * multi-threading will occur. - * - * @return - */ - public static SyncMessageBus SynchronousOnly(){ - BusConfiguration syncPubSubCfg = new BusConfiguration(); - syncPubSubCfg.addFeature(Feature.SyncPubSub.Default()); - return new SyncMessageBus(syncPubSubCfg); - } - - /** - * Create a message bus supporting synchronous and asynchronous message publication. - * Asynchronous message publication will be handled by a single thread such that FIFO - * order of message processing is guaranteed. - * - * - * @return - */ - public static IMessageBus AsynchronousSequentialFIFO(){ - BusConfiguration asyncFIFOConfig = new BusConfiguration(); - asyncFIFOConfig.addFeature(Feature.SyncPubSub.Default()); - asyncFIFOConfig.addFeature(Feature.AsynchronousHandlerInvocation.Default(1, 1)); - asyncFIFOConfig.addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(1)); - return new MBassador(asyncFIFOConfig); - } -} diff --git a/src/main/java/net/engio/mbassy/IMessageBus.java b/src/main/java/net/engio/mbassy/IMessageBus.java index 6c52321..405f34c 100644 --- a/src/main/java/net/engio/mbassy/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/IMessageBus.java @@ -1,7 +1,5 @@ package net.engio.mbassy; -import java.util.concurrent.Executor; - import net.engio.mbassy.bus.error.ErrorHandlingSupport; /** @@ -59,16 +57,6 @@ import net.engio.mbassy.bus.error.ErrorHandlingSupport; */ public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport { - /** - * Get the executor service that is used for asynchronous message publications. - * The executor is passed to the message bus at creation time. - * - * Note: The executor can be obtained from the run time. See - * @return - */ - @Deprecated - Executor getExecutor(); - /** * Check whether any asynchronous message publications are pending to be processed * diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java new file mode 100644 index 0000000..bf9d98e --- /dev/null +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -0,0 +1,155 @@ +package net.engio.mbassy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import net.engio.mbassy.bus.AbstractPubSubSupport; +import net.engio.mbassy.bus.error.PublicationError; + +/** + * The base class for all message bus implementations with support for asynchronous message dispatch + */ +public class MBassador extends AbstractPubSubSupport implements IMessageBus { + + private final int numberOfMessageDispatchers; + + // all threads that are available for asynchronous message dispatching + private final List dispatchers; + + // all pending messages scheduled for asynchronous dispatch are queued here + private final BlockingQueue pendingMessages = new LinkedBlockingQueue(Integer.MAX_VALUE / 16); + + protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() { + + private final AtomicInteger threadID = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setDaemon(true);// do not prevent the JVM from exiting + thread.setName("Dispatcher-" + this.threadID.getAndIncrement()); + return thread; + } + }; + + + public MBassador() { + this(6); + } + + public MBassador(int numberOfMessageDispatchers) { + super(); + this.numberOfMessageDispatchers = numberOfMessageDispatchers; + + this.dispatchers = new ArrayList(numberOfMessageDispatchers); + initDispatcherThreads(); + } + + // initialize the dispatch workers + private void initDispatcherThreads() { + for (int i = 0; i < this.numberOfMessageDispatchers; i++) { + // each thread will run forever and process incoming + // message publication requests + Thread dispatcher = MessageDispatchThreadFactory.newThread(new Runnable() { + @Override + public void run() { + T message = null; + while (true) { + try { + message = MBassador.this.pendingMessages.take(); + publishMessage(message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Throwable t) { + handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message)); + } + } + } + }); + dispatcher.setName("Message dispatcher"); + this.dispatchers.add(dispatcher); + dispatcher.start(); + } + } + + + /** + * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) + * The call blocks until every messageHandler has processed the message. + * + * @param message + */ + @Override + public void publish(T message) { + try { + publishMessage(message); + } catch (Throwable e) { + handlePublicationError(new PublicationError() + .setMessage("Error during publication of message") + .setCause(e) + .setPublishedObject(message)); + } + } + + /** + * Execute the message publication asynchronously. The behavior of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call might block until the message can be placed in the queue. + * + * @return A message publication that can be used to access information about it's state + */ + @Override + public void publishAsync(T message) { + try { + this.pendingMessages.put(message); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); + } + } + + /** + * Execute the message publication asynchronously. The behaviour of this method depends on the + * configured queuing strategy: + *

+ * If an unbound queuing strategy is used the call returns immediately. + * If a bounded queue is used the call will block until the message can be placed in the queue + * or the timeout is reached. + * + * @return A message publication that wraps up the publication request + */ + @Override + public void publishAsync(T message, long timeout, TimeUnit unit) { + try { + this.pendingMessages.offer(message, timeout, unit); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); + } + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + shutdown(); + } + + @Override + public void shutdown() { + for (Thread dispatcher : this.dispatchers) { + dispatcher.interrupt(); + } + } + + @Override + public boolean hasPendingMessages() { + return this.pendingMessages.size() > 0; + } +} diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index bfb2638..c563444 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -6,8 +6,6 @@ import java.util.Collections; import java.util.List; import net.engio.mbassy.PubSubSupport; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.bus.error.ErrorHandlingSupport; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; @@ -28,10 +26,8 @@ public abstract class AbstractPubSubSupport implements PubSubSupport, Erro private final SubscriptionManager subscriptionManager; - public AbstractPubSubSupport(IBusConfiguration configuration) { - // configure the pub sub feature - Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class); - this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader()); + public AbstractPubSubSupport() { + this.subscriptionManager = new SubscriptionManager(); } @Override diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java deleted file mode 100644 index 004697e..0000000 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ /dev/null @@ -1,117 +0,0 @@ -package net.engio.mbassy.bus; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import net.engio.mbassy.IMessageBus; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; -import net.engio.mbassy.bus.error.PublicationError; - -/** - * The base class for all message bus implementations with support for asynchronous message dispatch - */ -public abstract class AbstractSyncAsyncMessageBus - extends AbstractPubSubSupport implements IMessageBus { - - // executor for asynchronous message handlers - private final ExecutorService executor; - - // all threads that are available for asynchronous message dispatching - private final List dispatchers; - - // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue pendingMessages = new LinkedBlockingQueue(Integer.MAX_VALUE/16); - - protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) { - super(configuration); - - // configure asynchronous message dispatch - Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class); - this.dispatchers = new ArrayList(asyncDispatch.getNumberOfMessageDispatchers()); - initDispatcherThreads(asyncDispatch); - - // configure asynchronous handler invocation - Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class); - this.executor = asyncInvocation.getExecutor(); - } - - // initialize the dispatch workers - private void initDispatcherThreads(Feature.AsynchronousMessageDispatch configuration) { - for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); i++) { - // each thread will run forever and process incoming - // message publication requests - Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() { - @Override - public void run() { - T message = null; - while (true) { - try { - message = AbstractSyncAsyncMessageBus.this.pendingMessages.take(); - publishMessage(message); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } catch(Throwable t){ - handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message)); - } - } - } - }); - dispatcher.setName("Message dispatcher"); - this.dispatchers.add(dispatcher); - dispatcher.start(); - } - } - - - // this method queues a message delivery request - protected void addAsynchronousPublication(T message) { - try { - this.pendingMessages.put(message); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); - } - } - - // this method queues a message delivery request - protected void addAsynchronousPublication(T message, long timeout, TimeUnit unit) { - try { - this.pendingMessages.offer(message, timeout, unit); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); - } - } - - @Override - protected void finalize() throws Throwable { - super.finalize(); - shutdown(); - } - - @Override - public void shutdown() { - for (Thread dispatcher : this.dispatchers) { - dispatcher.interrupt(); - } - if(this.executor != null) { - this.executor.shutdown(); - } - } - - @Override - public boolean hasPendingMessages() { - return this.pendingMessages.size() > 0; - } - - @Override - public Executor getExecutor() { - return this.executor; - } - -} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java deleted file mode 100644 index 29d43cc..0000000 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ /dev/null @@ -1,73 +0,0 @@ -package net.engio.mbassy.bus; - -import java.util.concurrent.TimeUnit; - -import net.engio.mbassy.IMessageBus; -import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; -import net.engio.mbassy.bus.error.PublicationError; - - -public class MBassador extends AbstractSyncAsyncMessageBus implements IMessageBus { - - public MBassador(IBusConfiguration configuration) { - super(configuration); - } - - public MBassador(){ - super(new BusConfiguration() - .addFeature(Feature.SyncPubSub.Default()) - .addFeature(Feature.AsynchronousHandlerInvocation.Default()) - .addFeature(Feature.AsynchronousMessageDispatch.Default())); - } - - /** - * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) - * The call blocks until every messageHandler has processed the message. - * - * @param message - */ - @Override - public void publish(T message) { - try { - publishMessage(message); - } catch (Throwable e) { - handlePublicationError(new PublicationError() - .setMessage("Error during publication of message") - .setCause(e) - .setPublishedObject(message)); - } - } - - - /** - * Execute the message publication asynchronously. The behaviour of this method depends on the - * configured queuing strategy: - *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call might block until the message can be placed in the queue. - * - * @return A message publication that can be used to access information about it's state - */ - @Override - public void publishAsync(T message) { - addAsynchronousPublication(message); - } - - - /** - * Execute the message publication asynchronously. The behaviour of this method depends on the - * configured queuing strategy: - *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call will block until the message can be placed in the queue - * or the timeout is reached. - * - * @return A message publication that wraps up the publication request - */ - @Override - public void publishAsync(T message, long timeout, TimeUnit unit) { - addAsynchronousPublication(message, timeout, unit); - } -} diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java deleted file mode 100644 index f9079e7..0000000 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ /dev/null @@ -1,57 +0,0 @@ -package net.engio.mbassy.bus; - -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; - -import net.engio.mbassy.IMessageBus; -import net.engio.mbassy.bus.config.IBusConfiguration; -import net.engio.mbassy.bus.error.PublicationError; - -/** - * A message bus implementation that offers only synchronous message publication. Using this bus - * will not create any new threads. - * - */ -public class SyncMessageBus extends AbstractPubSubSupport implements IMessageBus { - - - public SyncMessageBus(IBusConfiguration configuration) { - super(configuration); - } - - @Override - public void publish(T message) { - try { - publishMessage(message); - } catch (Throwable e) { - handlePublicationError(new PublicationError() - .setMessage("Error during publication of message") - .setCause(e) - .setPublishedObject(message)); - } - } - - @Override - public void publishAsync(T message) { - publish(message); - } - - @Override - public void publishAsync(T message, long timeout, TimeUnit unit) { - publish(message); - } - - @Override - public Executor getExecutor() { - return null; - } - - @Override - public boolean hasPendingMessages() { - return false; - } - - @Override - public void shutdown() { - } -} diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java deleted file mode 100644 index 17ce0e0..0000000 --- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java +++ /dev/null @@ -1,29 +0,0 @@ -package net.engio.mbassy.bus.config; - -import java.util.HashMap; -import java.util.Map; - -/** - * The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour. - */ -public class BusConfiguration implements IBusConfiguration { - - // the registered features - private Map, Feature> features = new HashMap, Feature>(); - - public BusConfiguration() { - super(); - } - - @Override - public T getFeature(Class feature) { - return (T)this.features.get(feature); - } - - @Override - public IBusConfiguration addFeature(Feature feature) { - this.features.put(feature.getClass(), feature); - return this; - } - -} diff --git a/src/main/java/net/engio/mbassy/bus/config/ConfigurationError.java b/src/main/java/net/engio/mbassy/bus/config/ConfigurationError.java deleted file mode 100644 index 2e4ed42..0000000 --- a/src/main/java/net/engio/mbassy/bus/config/ConfigurationError.java +++ /dev/null @@ -1,10 +0,0 @@ -package net.engio.mbassy.bus.config; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 8/29/14 - */ -public class ConfigurationError { -} diff --git a/src/main/java/net/engio/mbassy/bus/config/Feature.java b/src/main/java/net/engio/mbassy/bus/config/Feature.java deleted file mode 100644 index 72e8e70..0000000 --- a/src/main/java/net/engio/mbassy/bus/config/Feature.java +++ /dev/null @@ -1,126 +0,0 @@ -package net.engio.mbassy.bus.config; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import net.engio.mbassy.listener.MetadataReader; - -/** - * A feature defines the configuration of a specific functionality of a message bus. - * - * @author bennidi - * Date: 8/29/14 - */ -public interface Feature { - - - class SyncPubSub implements Feature { - - public static final SyncPubSub Default(){ - return new SyncPubSub() - .setMetadataReader(new MetadataReader()); - } - - private MetadataReader metadataReader; - - - public MetadataReader getMetadataReader() { - return metadataReader; - } - - public SyncPubSub setMetadataReader(MetadataReader metadataReader) { - this.metadataReader = metadataReader; - return this; - } - } - - class AsynchronousHandlerInvocation implements Feature { - - protected static final ThreadFactory MessageHandlerThreadFactory = new ThreadFactory() { - - private final AtomicInteger threadID = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread thread = Executors.defaultThreadFactory().newThread(r); - thread.setName("AsyncHandler-" + threadID.getAndIncrement()); - thread.setDaemon(true); - return thread; - } - }; - - public static final AsynchronousHandlerInvocation Default(){ - int numberOfCores = Runtime.getRuntime().availableProcessors(); - return Default(numberOfCores, numberOfCores * 2); - } - - public static final AsynchronousHandlerInvocation Default(int initialCoreThreads, int maximumCoreThreads){ - int numberOfCores = Runtime.getRuntime().availableProcessors(); - return new AsynchronousHandlerInvocation().setExecutor(new ThreadPoolExecutor(initialCoreThreads, maximumCoreThreads, 1, - TimeUnit.MINUTES, new LinkedBlockingQueue(), MessageHandlerThreadFactory)); - } - - private ExecutorService executor; - - public ExecutorService getExecutor() { - return executor; - } - - public AsynchronousHandlerInvocation setExecutor(ExecutorService executor) { - this.executor = executor; - return this; - } - } - - class AsynchronousMessageDispatch implements Feature { - - protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() { - - private final AtomicInteger threadID = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread thread = Executors.defaultThreadFactory().newThread(r); - thread.setDaemon(true);// do not prevent the JVM from exiting - thread.setName("Dispatcher-" + threadID.getAndIncrement()); - return thread; - } - }; - - public static final AsynchronousMessageDispatch Default(){ - return new AsynchronousMessageDispatch() - .setNumberOfMessageDispatchers(2) - .setDispatcherThreadFactory(MessageDispatchThreadFactory); - } - - - private int numberOfMessageDispatchers; - private ThreadFactory dispatcherThreadFactory; - - public int getNumberOfMessageDispatchers() { - return numberOfMessageDispatchers; - } - - public AsynchronousMessageDispatch setNumberOfMessageDispatchers(int numberOfMessageDispatchers) { - this.numberOfMessageDispatchers = numberOfMessageDispatchers; - return this; - } - - - public ThreadFactory getDispatcherThreadFactory() { - return dispatcherThreadFactory; - } - - public AsynchronousMessageDispatch setDispatcherThreadFactory(ThreadFactory dispatcherThreadFactory) { - this.dispatcherThreadFactory = dispatcherThreadFactory; - return this; - } - } - - -} diff --git a/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java deleted file mode 100644 index 48248bc..0000000 --- a/src/main/java/net/engio/mbassy/bus/config/IBusConfiguration.java +++ /dev/null @@ -1,25 +0,0 @@ -package net.engio.mbassy.bus.config; - -/** - * The configuration of message bus instances is feature driven, e.g. configuration parameters - * are grouped into {@link Feature}. - * - * Features can be added to a bus configuration to be used later in the instantiation process of the message bus. - * Each bus will look for the features it requires and configure them according to the provided configuration. If a required feature is not found the bus will publish a {@link ConfigurationError} - * to the {@link ConfigurationErrorHandler} - * - * @author bennidi. - */ -public interface IBusConfiguration{ - - /** - * Get a registered feature by its type (class). - * - * @param feature - * @param - * @return - */ - T getFeature(Class feature); - - IBusConfiguration addFeature(Feature feature); -} diff --git a/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java b/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java index 1bbfd31..92bc176 100644 --- a/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java +++ b/src/main/java/net/engio/mbassy/bus/error/IPublicationErrorHandler.java @@ -10,7 +10,6 @@ package net.engio.mbassy.bus.error; * @author bennidi * Date: 2/22/12 */ -@SuppressWarnings("PMD.UnusedModifier") public interface IPublicationErrorHandler { /** diff --git a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java index 01a62a3..ffde43c 100644 --- a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java +++ b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java @@ -7,6 +7,7 @@ package net.engio.mbassy.bus.error; * Date: 3/29/13 */ public class MessageBusException extends Exception { + private static final long serialVersionUID = 1L; public MessageBusException() { } diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/listener/MessageHandler.java index ea58b99..ea48915 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandler.java @@ -24,7 +24,6 @@ public class MessageHandler { private final boolean isSynchronized; - public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){ super(); diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java index 494df56..bbf9e71 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java @@ -30,7 +30,7 @@ import net.engio.mbassy.listener.MetadataReader; public class SubscriptionManager { // the metadata reader that is used to inspect objects passed to the subscribe method - private final MetadataReader metadataReader; + private final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type // this is the primary list for dispatching a specific message @@ -52,8 +52,7 @@ public class SubscriptionManager { private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - public SubscriptionManager(MetadataReader metadataReader) { - this.metadataReader = metadataReader; + public SubscriptionManager() { } diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 33839df..4567c1c 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -14,7 +14,6 @@ import org.junit.runners.Suite; WeakConcurrentSetTest.class, MBassadorTest.class, SyncBusTest.MBassadorTest.class, - SyncBusTest.SyncMessageBusTest.class, MetadataReaderTest.class, MethodDispatchTest.class, DeadMessageTest.class, diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java index cf3c56a..ab2120c 100644 --- a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -18,11 +18,11 @@ public class AsyncFIFOBusTest extends MessageBusTest { @Test public void testSingleThreadedSyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); + IMessageBus fifoBUs = new MBassador(); - List listeners = new LinkedList(); + List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ - SyncListener listener = new SyncListener(); + Listener listener = new Listener(); listeners.add(listener); fifoBUs.subscribe(listener); } @@ -34,14 +34,14 @@ public class AsyncFIFOBusTest extends MessageBusTest { } // publish in ascending order for(Integer message : messages) { - fifoBUs.publishAsync(message); + fifoBUs.publish(message); } while(fifoBUs.hasPendingMessages()) { pause(1000); } - for(SyncListener listener : listeners){ + for(Listener listener : listeners){ assertEquals(messages.length, listener.receivedSync.size()); for(int i=0; i < messages.length; i++){ assertEquals(messages[i], listener.receivedSync.get(i)); @@ -50,15 +50,14 @@ public class AsyncFIFOBusTest extends MessageBusTest { } - // NOTE: Can fail due to timing issues. @Test public void testSingleThreadedSyncAsyncFIFO(){ // create a fifo bus with 1000 concurrently subscribed listeners - IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO(); + IMessageBus fifoBUs = new MBassador(1); - List listeners = new LinkedList(); + List listeners = new LinkedList(); for(int i = 0; i < 1000 ; i++){ - SyncAsyncListener listener = new SyncAsyncListener(); + Listener listener = new Listener(); listeners.add(listener); fifoBUs.subscribe(listener); } @@ -69,42 +68,38 @@ public class AsyncFIFOBusTest extends MessageBusTest { messages[i] = i; } // publish in ascending order - for(Integer message : messages) { + for (Integer message : messages) { fifoBUs.publishAsync(message); } - while(fifoBUs.hasPendingMessages()) { + while (fifoBUs.hasPendingMessages()) { pause(2000); } - for(SyncAsyncListener listener : listeners){ - assertEquals(messages.length, listener.receivedSync.size()); - for(int i=0; i < messages.length; i++){ - assertEquals(messages[i], listener.receivedSync.get(i)); + for(Listener listener : listeners) { + List receivedSync = listener.receivedSync; + + synchronized (receivedSync) { + assertEquals(messages.length, receivedSync.size()); + + for(int i=0; i < messages.length; i++){ + assertEquals(messages[i], receivedSync.get(i)); + } } } } - public static class SyncListener { + public static class Listener { private List receivedSync = new LinkedList(); @Handler public void handleSync(Integer message){ - this.receivedSync.add(message); + synchronized (this.receivedSync) { + this.receivedSync.add(message); + } } } - - public static class SyncAsyncListener { - - private List receivedSync = new LinkedList(); - - @Handler - public void handleSync(Integer message){ - this.receivedSync.add(message); - } - } - } diff --git a/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java b/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java index 1a40cf3..3a6174c 100644 --- a/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java +++ b/src/test/java/net/engio/mbassy/CustomHandlerAnnotationTest.java @@ -10,7 +10,6 @@ import java.util.Set; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Synchronized; -import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.common.MessageBusTest; import org.junit.Test; @@ -95,7 +94,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest @Test public void testMetaHandlerFiltering() { - MBassador bus = createBus(SyncAsync()); + MBassador bus = createBus(); NamedMessageListener listener = new NamedMessageListener(); bus.subscribe( listener ); diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index aea1035..89f57cc 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -4,7 +4,6 @@ import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.bus.DeadMessage; -import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; @@ -33,7 +32,7 @@ public class DeadMessageTest extends MessageBusTest{ @Test public void testDeadMessage(){ - final MBassador bus = createBus(SyncAsync()); + final MBassador bus = createBus(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class) @@ -69,7 +68,7 @@ public class DeadMessageTest extends MessageBusTest{ @Test public void testUnsubscribingAllListeners() { - final MBassador bus = createBus(SyncAsync()); + final MBassador bus = createBus(); ListenerFactory deadMessageListener = new ListenerFactory() .create(InstancesPerListener, DeadMessagHandler.class) .create(InstancesPerListener, Object.class); diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java index cc36b87..289cb0d 100644 --- a/src/test/java/net/engio/mbassy/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -2,7 +2,6 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; @@ -35,7 +34,7 @@ public class MBassadorTest extends MessageBusTest { ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, Listeners.synchronous()) .create(InstancesPerListener, Listeners.noHandlers()); - final MBassador bus = createBus(SyncAsync(), listeners); + final MBassador bus = createBus(listeners); Runnable publishAndCheck = new Runnable() { @@ -69,7 +68,7 @@ public class MBassadorTest extends MessageBusTest { ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, Listeners.noHandlers()); - final MBassador bus = createBus(SyncAsync(), listeners); + final MBassador bus = createBus(listeners); final MessageManager messageManager = new MessageManager(); @@ -103,7 +102,7 @@ public class MBassadorTest extends MessageBusTest { } }; - final MBassador bus = new MBassador(SyncAsync()); + final MBassador bus = new MBassador(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); diff --git a/src/test/java/net/engio/mbassy/MethodDispatchTest.java b/src/test/java/net/engio/mbassy/MethodDispatchTest.java index 3fa868d..9e4db75 100644 --- a/src/test/java/net/engio/mbassy/MethodDispatchTest.java +++ b/src/test/java/net/engio/mbassy/MethodDispatchTest.java @@ -41,7 +41,7 @@ public class MethodDispatchTest extends MessageBusTest{ @Test public void testDispatch1(){ - IMessageBus bus = createBus(SyncAsync()); + IMessageBus bus = createBus(); EventListener2 listener2 = new EventListener2(); bus.subscribe(listener2); bus.publish("jfndf"); diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java index 19e94b3..be2117b 100644 --- a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -5,7 +5,6 @@ import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.SubscriptionValidator; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listeners.AbstractMessageListener; import net.engio.mbassy.listeners.ICountableListener; import net.engio.mbassy.listeners.IMessageListener; @@ -163,7 +162,7 @@ public class SubscriptionManagerTest extends AssertSupport { Overloading.ListenerBase.class, Overloading.ListenerSub.class); - SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader()); + SubscriptionManager subscriptionManager = new SubscriptionManager(); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) @@ -182,7 +181,7 @@ public class SubscriptionManagerTest extends AssertSupport { } private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){ - final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader()); + final SubscriptionManager subscriptionManager = new SubscriptionManager(); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index fa121fc..6f48ffe 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -2,7 +2,6 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.ConcurrentExecutor; @@ -119,17 +118,6 @@ public abstract class SyncBusTest extends MessageBusTest { } - public static class SyncMessageBusTest extends SyncBusTest { - - - @Override - protected IMessageBus getSyncMessageBus() { - return BusFactory.SynchronousOnly(); - } - } - - - static class IncrementingMessage{ private int count = 1; diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index 820b35f..39353bc 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -6,8 +6,6 @@ import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Synchronized; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.common.MessageBusTest; import org.junit.Test; @@ -27,10 +25,8 @@ public class SynchronizedHandlerTest extends MessageBusTest { @Test public void testSynchronizedWithSynchronousInvocation(){ List handlers = new LinkedList(); - IBusConfiguration config = SyncAsync(); - config.getFeature(Feature.AsynchronousMessageDispatch.class).setNumberOfMessageDispatchers(6); - IMessageBus bus = createBus(config); + IMessageBus bus = createBus(); for(int i = 0; i < numberOfListeners; i++){ SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery(); handlers.add(handler); diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index 2b4bf9b..d5334fc 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -1,10 +1,7 @@ package net.engio.mbassy.common; import junit.framework.Assert; -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.config.BusConfiguration; -import net.engio.mbassy.bus.config.Feature; -import net.engio.mbassy.bus.config.IBusConfiguration; +import net.engio.mbassy.MBassador; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.messages.MessageTypes; @@ -45,21 +42,15 @@ public abstract class MessageBusTest extends AssertSupport { } } - public static IBusConfiguration SyncAsync() { - return new BusConfiguration() - .addFeature(Feature.SyncPubSub.Default()) - .addFeature(Feature.AsynchronousHandlerInvocation.Default()) - .addFeature(Feature.AsynchronousMessageDispatch.Default()); - } - public MBassador createBus(IBusConfiguration configuration) { - MBassador bus = new MBassador(configuration); + public MBassador createBus() { + MBassador bus = new MBassador(); bus.addErrorHandler(TestFailingHandler); return bus; } - public MBassador createBus(IBusConfiguration configuration, ListenerFactory listeners) { - MBassador bus = new MBassador(configuration); + public MBassador createBus(ListenerFactory listeners) { + MBassador bus = new MBassador(); bus.addErrorHandler(TestFailingHandler); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); return bus; diff --git a/src/test/java/net/engio/mbassy/common/TestUtil.java b/src/test/java/net/engio/mbassy/common/TestUtil.java index 7455d91..0854f65 100644 --- a/src/test/java/net/engio/mbassy/common/TestUtil.java +++ b/src/test/java/net/engio/mbassy/common/TestUtil.java @@ -1,7 +1,7 @@ package net.engio.mbassy.common; +import net.engio.mbassy.MBassador; import net.engio.mbassy.PubSubSupport; -import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.subscription.SubscriptionManager; import java.util.Iterator;