implemented #73

reworked bus configuration
This commit is contained in:
Benjamin Diedrichsen 2014-08-31 12:33:37 +02:00
parent 86830b847d
commit 5c56360bf8
25 changed files with 340 additions and 344 deletions

View File

@ -34,18 +34,13 @@ At its core MBassador offers the following features:
+ <em><strong>Annotation driven</em></strong>: To define and customize a message handler simply mark it with @Handler annotation
+ <em><strong>Delivers everything</em></strong>: Messages must not implement any interface and can be of any type. It is
possible though to define an upper bound of the message type using generics. The class hierarchy of a message is considered during message delivery.
This means that handlers will also receive subtypes of the message type they are listening for, e.g. a handler of Object.class receives everything.
possible though to define an upper bound of the message type using generics. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for, e.g. a handler of Object.class receives everything.
+ <em><strong>Synchronous and asynchronous message delivery</em></strong>: A handler can be invoked to handle a message either synchronously or
asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method
blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
+ <em><strong>Weak references</em></strong>: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable
in certain environments where listeners are managed by frameworks, i.e. Spring, Guice etc. Just stuff everything into the message bus, it will
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
+ <em><strong>Weak references</em></strong>: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable in certain environments where listeners are managed by frameworks, i.e. Spring, Guice etc. Just stuff everything into the message bus, it will ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
+ <em><strong>Strong references</em></strong>: Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
a single message handler
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler
+ <em><strong>Message envelopes</em></strong>: Message handlers can declare to receive an enveloped message. The envelope can wrap different
types of messages. This allows for a single handler to handle multiple, unrelated message types.
+ <em><strong>Handler priorities</em></strong>: A handler can be associated with a priority to influence the order in which messages are delivered when multiple matching handlers exist
@ -55,12 +50,8 @@ can be handled by registering listeners that handle DeadMessage.
+ <em><strong>FilteredMessage event</em></strong>: Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message.
FilteredMessage events can be handled by registering listeners that handle FilteredMessage.
+ <em><strong>Synchronization</em></strong>: It is possible to ensure that a handler is invoked non-concurrently,i.e. making it thread-safe by adding @Synchronized
+ <em><strong>Extensibility</em></strong>:MBassador is designed to be extensible with custom implementations of various components like message
dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different
kinds of objects. A configuration object is used to customize the different configurable parts
+ <em><strong>Ease of Use</em></strong>: Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do),
mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance. Start
sending messages to your listeners using one of MBassador's publication methods (sync or async). Done!
+ <em><strong>Extensibility</em></strong>: MBassador is designed to be extensible with custom implementations of various components like message dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different kinds of objects. A configuration object is used to customize the different configurable parts (Features)
+ <em><strong>Ease of Use</em></strong>: Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do), mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance. Start sending messages to your listeners using one of MBassador's publication methods (sync or async). Done!
@ -138,18 +129,16 @@ Message publication:
bus.post(subMessage).now(); // same as above
<h2>Installation</h2>
Beginning with version 1.1.0 MBassador is available from the Maven Central Repository using the following coordinates:
MBassador is available from the Maven Central Repository using the following coordinates:
```xml
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.1.10</version>
<version>1.1.11</version>
</dependency>
```
You can also download the latest binary release here: http://mvnrepository.com/artifact/net.engio/mbassador
Of course you can always clone the repository and build from source.
You can also download the latest binary release from the official [maven repository](http://mvnrepository.com/artifact/net.engio/mbassador). Of course you can always clone the repository and build from source.
<h2>Wiki</h2>
There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus
@ -161,6 +150,12 @@ to avoid confusion and misunderstanding.
<h3>1.1.11</h3>
+ Added support for conditional handlers using Java EL. Thanks to Bernd Rosstauscher
for the initial implementation.
+ BREAKING CHANGES in BusConfiguration
++ Complete redesign of configuration setup using Features instead of simple get/set parameters. This will allow
to flexibly combine features and still be able to exclude those not available in certain environments,for example, threading and reflection in GWT (this will be part of future releases)
++ Properties formerly located in BusConfiguration now moved to their respective Feature class
++ Removed all SyncXX related interfaces and config implementations. There is now only one `BusConfiguration`
with its corresponding interface which will be used for all types of message bus implementations
<h3>1.1.10</h3>

View File

@ -2,7 +2,8 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
@ -31,13 +32,15 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
private final BusRuntime runtime;
public AbstractPubSubSupport(ISyncBusConfiguration configuration) {
public AbstractPubSubSupport(IBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers());
this.subscriptionManager = configuration.getSubscriptionManagerProvider()
.createManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory(), runtime);
this.publicationFactory = configuration.getMessagePublicationFactory();
// configure the pub sub feature
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider()
.createManager(pubSubFeature.getMetadataReader(),
pubSubFeature.getSubscriptionFactory(), runtime);
this.publicationFactory = pubSubFeature.getPublicationFactory();
}
protected MessagePublication.Factory getPublicationFactory() {

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
@ -32,21 +33,26 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
super(configuration);
this.executor = configuration.getExecutorForAsynchronousHandlers();
// configure asynchronous message dispatch
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
pendingMessages = asyncDispatch.getPendingMessages();
dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
initDispatcherThreads(asyncDispatch);
// configure asynchronous handler invocation
Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class);
this.executor = asyncInvocation.getExecutor();
getRuntime().add(BusRuntime.Properties.AsynchronousHandlerExecutor, executor);
pendingMessages = configuration.getPendingMessagesQueue();
dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
initDispatcherThreads(configuration);
}
// initialize the dispatch workers
private void initDispatcherThreads(IBusConfiguration configuration) {
private void initDispatcherThreads(Feature.AsynchronousMessageDispatch configuration) {
for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); i++) {
// each thread will run forever and process incoming
// message publication requests
Thread dispatcher = configuration.getThreadFactoryForAsynchronousMessageDispatch().newThread(new Runnable() {
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
public void run() {
while (true) {
MessagePublication publication = null;

View File

@ -3,7 +3,7 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.SyncBusConfiguration;
import net.engio.mbassy.bus.config.Feature;
/**
* The bus factory provides convenient factory methods for the most common bus use cases.
@ -21,7 +21,9 @@ public class BusFactory {
* @return
*/
public static ISyncMessageBus SynchronousOnly(){
return new SyncMessageBus(new SyncBusConfiguration());
BusConfiguration syncPubSubCfg = new BusConfiguration();
syncPubSubCfg.addFeature(Feature.SyncPubSub.Default());
return new SyncMessageBus(syncPubSubCfg);
}
/**
@ -33,6 +35,10 @@ public class BusFactory {
* @return
*/
public static IMessageBus AsynchronousSequentialFIFO(){
return new MBassador(BusConfiguration.Default(1,1,1));
BusConfiguration asyncFIFOConfig = new BusConfiguration();
asyncFIFOConfig.addFeature(Feature.SyncPubSub.Default());
asyncFIFOConfig.addFeature(Feature.AsynchronousHandlerInvocation.Default(1, 1));
asyncFIFOConfig.addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(1));
return new MBassador(asyncFIFOConfig);
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit;
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> implements IMessageBus<T, SyncAsyncPostCommand<T>> {
public MBassador(BusConfiguration configuration) {
public MBassador(IBusConfiguration configuration) {
super(configuration);
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.IPublicationCommand;
@ -13,7 +13,7 @@ import net.engio.mbassy.bus.publication.IPublicationCommand;
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements ISyncMessageBus<T, SyncMessageBus.SyncPostCommand> {
public SyncMessageBus(ISyncBusConfiguration configuration) {
public SyncMessageBus(IBusConfiguration configuration) {
super(configuration);
}

View File

@ -1,201 +1,51 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.Map;
/**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
*
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration implements IBusConfiguration {
protected static final ThreadFactory AsynchronousHandlerThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("AsyncHandler-" + threadID.getAndIncrement());
thread.setDaemon(true);
return thread;
}
};
protected static final ThreadFactory DispatcherThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);// do not prevent the JVM from exiting
thread.setName("Dispatcher-" + threadID.getAndIncrement());
return thread;
}
};
/**
* Creates a new instance, using the Default settings of 2 dispatchers, and
* Creates a new instance, using the default settings of 2 dispatchers, and
* asynchronous handlers with an initial count equal to the number of
* available processors in the machine, with maximum count equal to
* 2 * the number of available processors. Uses {@link Runtime#availableProcessors()} to
* determine the number of available processors
*
* @return a Default BusConfiguration
*/
public static BusConfiguration Default() {
return Default(2);
}
/**
* Creates a new instance, using the specified number of dispatchers, and
* asynchronous handlers with an initial count equal to the number of
* available processors in the machine, with maximum count equal to
* 2 * the number of available processors. Uses {@link Runtime#availableProcessors()} to
* determine the number of available processors
*
* @return a Default BusConfiguration
*/
public static BusConfiguration Default(int numberOfDispatchers) {
int numberOfCoreThreads = Runtime.getRuntime().availableProcessors();
return Default(numberOfDispatchers, numberOfCoreThreads, numberOfCoreThreads * 2);
}
/**
* Creates a new instance, using the specified number of dispatchers, and
* asynchronous handlers with initial threads and maximum threads specified by the calling
* parameters.
*
* @return a Default BusConfiguration
*/
public static BusConfiguration Default(int numberOfDispatchers, int initialCoreThreads, int maximumCoreThreads) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(initialCoreThreads, maximumCoreThreads, 1,
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), AsynchronousHandlerThreadFactory);
return Default(numberOfDispatchers, executor);
}
/**
* Creates a new instance, using the specified number of dispatchers, and
* asynchronous handlers that use the provided ThreadPoolExecutor.
*
* @return a Default BusConfiguration
*/
public static BusConfiguration Default(int numberOfDispatchers, ThreadPoolExecutor executor) {
*
* @deprecated Use feature driven configuration instead
**/
@Deprecated()
public static BusConfiguration SyncAsync() {
BusConfiguration defaultConfig = new BusConfiguration();
defaultConfig.setExecutorForAsynchronousHandlers(executor);
defaultConfig.setMetadataReader(new MetadataReader());
defaultConfig.setSubscriptionFactory(new SubscriptionFactory());
defaultConfig.setNumberOfMessageDispatchers(numberOfDispatchers);
defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory());
defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory);
defaultConfig.setSubscriptionManagerProvider(new SubscriptionManagerProvider());
defaultConfig.addFeature(Feature.SyncPubSub.Default());
defaultConfig.addFeature(Feature.AsynchronousHandlerInvocation.Default());
defaultConfig.addFeature(Feature.AsynchronousMessageDispatch.Default());
return defaultConfig;
}
public static BusConfiguration Empty(){
return new BusConfiguration();
}
// the registered features
private Map<Class<? extends Feature>, Feature> features = new HashMap<Class<? extends Feature>, Feature>();
protected int numberOfMessageDispatchers;
protected ExecutorService executor;
protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
protected ThreadFactory dispatcherThreadFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public void setPendingMessagesQueue(BlockingQueue<MessagePublication> pendingMessagesQueue) {
this.pendingMessagesQueue = pendingMessagesQueue;
}
protected BlockingQueue<MessagePublication> pendingMessagesQueue;
private BusConfiguration() {
public BusConfiguration() {
super();
}
@Override
public int getNumberOfMessageDispatchers() {
return numberOfMessageDispatchers > 0 ? numberOfMessageDispatchers : 2;
}
public BusConfiguration setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
return this;
}
@Override
public ExecutorService getExecutorForAsynchronousHandlers() {
return executor;
public <T extends Feature> T getFeature(Class<T> feature) {
return (T)features.get(feature);
}
@Override
public BlockingQueue<MessagePublication> getPendingMessagesQueue() {
return new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE);
}
@Override
public ThreadFactory getThreadFactoryForAsynchronousMessageDispatch() {
return dispatcherThreadFactory;
}
public BusConfiguration setThreadFactoryForAsynchronousMessageDispatch(ThreadFactory factory) {
dispatcherThreadFactory = factory;
return this;
}
public BusConfiguration setExecutorForAsynchronousHandlers(ExecutorService executor) {
this.executor = executor;
public IBusConfiguration addFeature(Feature feature) {
features.put(feature.getClass(), feature);
return this;
}
@Override
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public BusConfiguration setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
return this;
}
@Override
public MetadataReader getMetadataReader() {
return metadataReader;
}
public BusConfiguration setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return this;
}
@Override
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public BusConfiguration setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public BusConfiguration setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return this;
public IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler) {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
}

View File

@ -0,0 +1,10 @@
package net.engio.mbassy.bus.config;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 8/29/14
*/
public class ConfigurationError {
}

View File

@ -0,0 +1,10 @@
package net.engio.mbassy.bus.config;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 8/29/14
*/
public interface ConfigurationErrorHandler {
}

View File

@ -0,0 +1,172 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A feature defines the configuration of a specific functionality of a message bus.
*
* @author bennidi
* Date: 8/29/14
*/
public interface Feature {
class SyncPubSub implements Feature{
public static final SyncPubSub Default(){
return new SyncPubSub()
.setMetadataReader(new MetadataReader())
.setPublicationFactory(new MessagePublication.Factory())
.setSubscriptionFactory(new SubscriptionFactory())
.setSubscriptionManagerProvider(new SubscriptionManagerProvider());
}
private MessagePublication.Factory publicationFactory;
private MetadataReader metadataReader;
private SubscriptionFactory subscriptionFactory;
private ISubscriptionManagerProvider subscriptionManagerProvider;
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public SyncPubSub setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public SyncPubSub setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
public MetadataReader getMetadataReader() {
return metadataReader;
}
public SyncPubSub setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return this;
}
/**
* The message publication factory is used to wrap a published message
* in a {@link MessagePublication} for processing.
* @return The factory to be used by the bus to create the publications
*/
public MessagePublication.Factory getPublicationFactory() {
return publicationFactory;
}
public SyncPubSub setPublicationFactory(MessagePublication.Factory publicationFactory) {
this.publicationFactory = publicationFactory;
return this;
}
}
class AsynchronousHandlerInvocation implements Feature{
protected static final ThreadFactory MessageHandlerThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("AsyncHandler-" + threadID.getAndIncrement());
thread.setDaemon(true);
return thread;
}
};
public static final AsynchronousHandlerInvocation Default(){
int numberOfCores = Runtime.getRuntime().availableProcessors();
return Default(numberOfCores, numberOfCores * 2);
}
public static final AsynchronousHandlerInvocation Default(int initialCoreThreads, int maximumCoreThreads){
int numberOfCores = Runtime.getRuntime().availableProcessors();
return new AsynchronousHandlerInvocation().setExecutor(new ThreadPoolExecutor(initialCoreThreads, maximumCoreThreads, 1,
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), MessageHandlerThreadFactory));
}
private ExecutorService executor;
public ExecutorService getExecutor() {
return executor;
}
public AsynchronousHandlerInvocation setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}
}
class AsynchronousMessageDispatch implements Feature{
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);// do not prevent the JVM from exiting
thread.setName("Dispatcher-" + threadID.getAndIncrement());
return thread;
}
};
public static final AsynchronousMessageDispatch Default(){
return new AsynchronousMessageDispatch()
.setNumberOfMessageDispatchers(2)
.setDispatcherThreadFactory(MessageDispatchThreadFactory)
.setMessageQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
}
private int numberOfMessageDispatchers;
private BlockingQueue<MessagePublication> pendingMessages;
private ThreadFactory dispatcherThreadFactory;
public int getNumberOfMessageDispatchers() {
return numberOfMessageDispatchers;
}
public AsynchronousMessageDispatch setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
return this;
}
public BlockingQueue<MessagePublication> getPendingMessages() {
return pendingMessages;
}
public AsynchronousMessageDispatch setMessageQueue(BlockingQueue<MessagePublication> pendingMessages) {
this.pendingMessages = pendingMessages;
return this;
}
public ThreadFactory getDispatcherThreadFactory() {
return dispatcherThreadFactory;
}
public AsynchronousMessageDispatch setDispatcherThreadFactory(ThreadFactory dispatcherThreadFactory) {
this.dispatcherThreadFactory = dispatcherThreadFactory;
return this;
}
}
}

View File

@ -1,26 +1,30 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 8/16/13
* Time: 9:56 AM
* To change this template use File | Settings | File Templates.
* The configuration of message bus instances is feature driven, e.g. configuration parameters
* are grouped into {@link Feature}.
*
* Features can be added to a bus configuration to be used later in the instantiation process of the message bus.
* Each bus will look for the features it requires and configure them according to the provided configuration. If a required feature is not found the bus will publish a {@link ConfigurationError}
* to the {@link ConfigurationErrorHandler}
*
* @author bennidi.
*/
public interface IBusConfiguration extends ISyncBusConfiguration {
public interface IBusConfiguration{
int getNumberOfMessageDispatchers();
/**
* Get a registered feature by its type (class).
*
* @param feature
* @param <T>
* @return
*/
<T extends Feature> T getFeature(Class<T> feature);
ExecutorService getExecutorForAsynchronousHandlers();
IBusConfiguration addFeature(Feature feature);
IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler);
BlockingQueue<MessagePublication> getPendingMessagesQueue();
ThreadFactory getThreadFactoryForAsynchronousMessageDispatch();
}

View File

@ -1,26 +0,0 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
/**
* The configuration options for the synchronous message bus {@link net.engio.mbassy.bus.SyncMessageBus}
*/
public interface ISyncBusConfiguration {
/**
* The message publication factory is used to wrap a published message
* and while it is being processed
* @return The factory to be used by the bus to create the publications
*/
MessagePublication.Factory getMessagePublicationFactory();
MetadataReader getMetadataReader();
SubscriptionFactory getSubscriptionFactory();
ISubscriptionManagerProvider getSubscriptionManagerProvider();
}

View File

@ -1,63 +0,0 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public class SyncBusConfiguration<C extends SyncBusConfiguration<C>> implements ISyncBusConfiguration {
protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public SyncBusConfiguration() {
this.metadataReader = new MetadataReader();
this.subscriptionFactory = new SubscriptionFactory();
this.messagePublicationFactory = new MessagePublication.Factory();
this.subscriptionManagerProvider = new SubscriptionManagerProvider();
}
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
}
public MetadataReader getMetadataReader() {
return metadataReader;
}
public C setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return (C) this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public C setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return (C) this;
}
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public C setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return (C) this;
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.bus.error;
/**
* Todo: Add javadoc
* The universal exception type for message bus implementations.
*
* @author bennidi
* Date: 3/29/13

View File

@ -8,7 +8,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
import java.util.Collection;
/**
* Todo: Add javadoc
* This is the base class for handler invocations that already implements all context related methods only leaving the implementation of the actual invocation mechanism to the concrete subclass.
*
* @author bennidi
* Date: 3/29/13

View File

@ -94,7 +94,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testSimpleStringCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("TEST", 0);
@ -110,7 +110,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testSimpleNumberCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 5);
@ -125,7 +125,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testHandleCombinedEL() throws Exception {
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 3);
@ -140,7 +140,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testNotMatchingAnyCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 0);
@ -154,7 +154,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testHandleMethodAccessEL() throws Exception {
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("XYZ", 1);

View File

@ -111,7 +111,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
@Test
public void testMetaHandlerFiltering() {
MBassador bus = getBus( BusConfiguration.Default() );
MBassador bus = getBus( BusConfiguration.SyncAsync() );
NamedMessageListener listener = new NamedMessageListener();
bus.subscribe( listener );

View File

@ -32,7 +32,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test
public void testDeadMessage(){
final MBassador bus = getBus(BusConfiguration.Default());
final MBassador bus = getBus(BusConfiguration.SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
@ -70,7 +70,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test
public void testUnsubscribingAllListeners() {
final MBassador bus = getBus(BusConfiguration.Default());
final MBassador bus = getBus(BusConfiguration.SyncAsync());
ListenerFactory deadMessageListener = new ListenerFactory()
.create(InstancesPerListener, DeadMessagHandler.class)
.create(InstancesPerListener, Object.class);

View File

@ -31,7 +31,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
@ -56,7 +56,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
@ -102,7 +102,7 @@ public class FilterTest extends MessageBusTest {
@Test
public void testSubtypesOnly(){
MBassador bus = getBus(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(100, TestMessageHandler.class);

View File

@ -28,7 +28,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.synchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
Runnable publishAndCheck = new Runnable() {
@ -62,7 +62,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@ -94,7 +94,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
final MessageManager messageManager = new MessageManager();
@ -132,7 +132,7 @@ public class MBassadorTest extends MessageBusTest {
}
};
final MBassador bus = new MBassador(BusConfiguration.Default());
final MBassador bus = new MBassador(BusConfiguration.SyncAsync());
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -34,6 +34,14 @@ public class MetadataReaderTest extends AssertSupport {
validator.check(listener);
}
/*
public void testInterfaced() {
MessageListener listener = reader.getMessageListener(InterfacedListener.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(1, Object.class);
validator.check(listener);
} WIP */
@Test
public void testListenerWithInheritance() {
@ -164,7 +172,7 @@ public class MetadataReaderTest extends AssertSupport {
}
@Handler
@Enveloped(messages = {Number.class})
@Enveloped(messages = Number.class)
public void handleEnveloped2(MessageEnvelope o) {
}
@ -175,11 +183,26 @@ public class MetadataReaderTest extends AssertSupport {
// narrow to integer
@Handler
@Enveloped(messages = {Integer.class})
@Enveloped(messages = Integer.class)
public void handleEnveloped2(MessageEnvelope o) {
}
}
public static interface ListenerInterface{
@Handler
@Enveloped(messages = Object.class)
void handle(MessageEnvelope envelope);
}
public class InterfacedListener implements ListenerInterface{
@Override
public void handle(MessageEnvelope envelope) {
//
}
}
}

View File

@ -41,7 +41,7 @@ public class MethodDispatchTest extends MessageBusTest{
@Test
public void testDispatch1(){
IMessageBus bus = getBus(BusConfiguration.Default());
IMessageBus bus = getBus(BusConfiguration.SyncAsync());
EventListener2 listener2 = new EventListener2();
bus.subscribe(listener2);
bus.post("jfndf").now();

View File

@ -178,7 +178,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new MBassador(BusConfiguration.Default());
return new MBassador(BusConfiguration.SyncAsync());
}
}

View File

@ -3,6 +3,8 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
@ -28,8 +30,10 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Test
public void testSynchronizedWithSynchronousInvocation(){
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
IMessageBus bus = getBus(BusConfiguration.Default()
.setNumberOfMessageDispatchers(6));
IBusConfiguration config = BusConfiguration.SyncAsync();
config.getFeature(Feature.AsynchronousMessageDispatch.class)
.setNumberOfMessageDispatchers(6);
IMessageBus bus = getBus(config);
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
handlers.add(handler);
@ -54,8 +58,10 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Test
public void testSynchronizedWithAsSynchronousInvocation(){
List<SynchronizedWithAsynchronousDelivery> handlers = new LinkedList<SynchronizedWithAsynchronousDelivery>();
IMessageBus bus = getBus(BusConfiguration.Default()
.setNumberOfMessageDispatchers(6));
IBusConfiguration config = BusConfiguration.SyncAsync();
config.getFeature(Feature.AsynchronousMessageDispatch.class)
.setNumberOfMessageDispatchers(6);
IMessageBus bus = getBus(config);
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithAsynchronousDelivery handler = new SynchronizedWithAsynchronousDelivery();
handlers.add(handler);
@ -66,7 +72,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
bus.post(new Object()).asynchronously();
}
// TODO: wait for publication
// TODO: wait for publication to finish
pause(10000);
for(SynchronizedWithAsynchronousDelivery handler : handlers){

View File

@ -3,7 +3,7 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.messages.MessageTypes;
@ -43,13 +43,13 @@ public abstract class MessageBusTest extends AssertSupport {
mes.reset();
}
public MBassador getBus(BusConfiguration configuration) {
public MBassador getBus(IBusConfiguration configuration) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
return bus;
}
public MBassador getBus(BusConfiguration configuration, ListenerFactory listeners) {
public MBassador getBus(IBusConfiguration configuration, ListenerFactory listeners) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);