Deleted configuration. Changed to single parameter for thread count in constructor
This commit is contained in:
parent
226a5f80a2
commit
ff4dd6271f
|
@ -1,44 +0,0 @@
|
|||
package net.engio.mbassy;
|
||||
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.bus.SyncMessageBus;
|
||||
import net.engio.mbassy.bus.config.BusConfiguration;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
|
||||
/**
|
||||
* The bus factory provides convenient factory methods for the most common bus use cases.
|
||||
*
|
||||
* @author bennidi
|
||||
* Date: 3/30/14
|
||||
*/
|
||||
public class BusFactory {
|
||||
|
||||
/**
|
||||
* Create a message bus supporting only synchronous message publication.
|
||||
* All message publications will run in the calling thread, no bus internal
|
||||
* multi-threading will occur.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static SyncMessageBus SynchronousOnly(){
|
||||
BusConfiguration syncPubSubCfg = new BusConfiguration();
|
||||
syncPubSubCfg.addFeature(Feature.SyncPubSub.Default());
|
||||
return new SyncMessageBus(syncPubSubCfg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a message bus supporting synchronous and asynchronous message publication.
|
||||
* Asynchronous message publication will be handled by a single thread such that FIFO
|
||||
* order of message processing is guaranteed.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static IMessageBus AsynchronousSequentialFIFO(){
|
||||
BusConfiguration asyncFIFOConfig = new BusConfiguration();
|
||||
asyncFIFOConfig.addFeature(Feature.SyncPubSub.Default());
|
||||
asyncFIFOConfig.addFeature(Feature.AsynchronousHandlerInvocation.Default(1, 1));
|
||||
asyncFIFOConfig.addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(1));
|
||||
return new MBassador(asyncFIFOConfig);
|
||||
}
|
||||
}
|
|
@ -1,7 +1,5 @@
|
|||
package net.engio.mbassy;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
|
||||
|
||||
/**
|
||||
|
@ -59,16 +57,6 @@ import net.engio.mbassy.bus.error.ErrorHandlingSupport;
|
|||
*/
|
||||
public interface IMessageBus<T> extends PubSubSupport<T>, ErrorHandlingSupport {
|
||||
|
||||
/**
|
||||
* Get the executor service that is used for asynchronous message publications.
|
||||
* The executor is passed to the message bus at creation time.
|
||||
*
|
||||
* Note: The executor can be obtained from the run time. See
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
Executor getExecutor();
|
||||
|
||||
/**
|
||||
* Check whether any asynchronous message publications are pending to be processed
|
||||
*
|
||||
|
|
155
src/main/java/net/engio/mbassy/MBassador.java
Normal file
155
src/main/java/net/engio/mbassy/MBassador.java
Normal file
|
@ -0,0 +1,155 @@
|
|||
package net.engio.mbassy;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.engio.mbassy.bus.AbstractPubSubSupport;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
|
||||
/**
|
||||
* The base class for all message bus implementations with support for asynchronous message dispatch
|
||||
*/
|
||||
public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
||||
|
||||
private final int numberOfMessageDispatchers;
|
||||
|
||||
// all threads that are available for asynchronous message dispatching
|
||||
private final List<Thread> dispatchers;
|
||||
|
||||
// all pending messages scheduled for asynchronous dispatch are queued here
|
||||
private final BlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>(Integer.MAX_VALUE / 16);
|
||||
|
||||
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {
|
||||
|
||||
private final AtomicInteger threadID = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = Executors.defaultThreadFactory().newThread(r);
|
||||
thread.setDaemon(true);// do not prevent the JVM from exiting
|
||||
thread.setName("Dispatcher-" + this.threadID.getAndIncrement());
|
||||
return thread;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
public MBassador() {
|
||||
this(6);
|
||||
}
|
||||
|
||||
public MBassador(int numberOfMessageDispatchers) {
|
||||
super();
|
||||
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
|
||||
|
||||
this.dispatchers = new ArrayList<Thread>(numberOfMessageDispatchers);
|
||||
initDispatcherThreads();
|
||||
}
|
||||
|
||||
// initialize the dispatch workers
|
||||
private void initDispatcherThreads() {
|
||||
for (int i = 0; i < this.numberOfMessageDispatchers; i++) {
|
||||
// each thread will run forever and process incoming
|
||||
// message publication requests
|
||||
Thread dispatcher = MessageDispatchThreadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
T message = null;
|
||||
while (true) {
|
||||
try {
|
||||
message = MBassador.this.pendingMessages.take();
|
||||
publishMessage(message);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (Throwable t) {
|
||||
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
dispatcher.setName("Message dispatcher");
|
||||
this.dispatchers.add(dispatcher);
|
||||
dispatcher.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
|
||||
* The call blocks until every messageHandler has processed the message.
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
@Override
|
||||
public void publish(T message) {
|
||||
try {
|
||||
publishMessage(message);
|
||||
} catch (Throwable e) {
|
||||
handlePublicationError(new PublicationError()
|
||||
.setMessage("Error during publication of message")
|
||||
.setCause(e)
|
||||
.setPublishedObject(message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the message publication asynchronously. The behavior of this method depends on the
|
||||
* configured queuing strategy:
|
||||
* <p/>
|
||||
* If an unbound queuing strategy is used the call returns immediately.
|
||||
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||
*
|
||||
* @return A message publication that can be used to access information about it's state
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message) {
|
||||
try {
|
||||
this.pendingMessages.put(message);
|
||||
} catch (InterruptedException e) {
|
||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||
* configured queuing strategy:
|
||||
* <p/>
|
||||
* If an unbound queuing strategy is used the call returns immediately.
|
||||
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||
* or the timeout is reached.
|
||||
*
|
||||
* @return A message publication that wraps up the publication request
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||
try {
|
||||
this.pendingMessages.offer(message, timeout, unit);
|
||||
} catch (InterruptedException e) {
|
||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
super.finalize();
|
||||
shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
for (Thread dispatcher : this.dispatchers) {
|
||||
dispatcher.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingMessages() {
|
||||
return this.pendingMessages.size() > 0;
|
||||
}
|
||||
}
|
|
@ -6,8 +6,6 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
|
||||
import net.engio.mbassy.PubSubSupport;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
|
||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
|
@ -28,10 +26,8 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, Erro
|
|||
private final SubscriptionManager subscriptionManager;
|
||||
|
||||
|
||||
public AbstractPubSubSupport(IBusConfiguration configuration) {
|
||||
// configure the pub sub feature
|
||||
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
|
||||
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader());
|
||||
public AbstractPubSubSupport() {
|
||||
this.subscriptionManager = new SubscriptionManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
package net.engio.mbassy.bus;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import net.engio.mbassy.IMessageBus;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
|
||||
/**
|
||||
* The base class for all message bus implementations with support for asynchronous message dispatch
|
||||
*/
|
||||
public abstract class AbstractSyncAsyncMessageBus<T>
|
||||
extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
||||
|
||||
// executor for asynchronous message handlers
|
||||
private final ExecutorService executor;
|
||||
|
||||
// all threads that are available for asynchronous message dispatching
|
||||
private final List<Thread> dispatchers;
|
||||
|
||||
// all pending messages scheduled for asynchronous dispatch are queued here
|
||||
private final BlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>(Integer.MAX_VALUE/16);
|
||||
|
||||
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
|
||||
super(configuration);
|
||||
|
||||
// configure asynchronous message dispatch
|
||||
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
|
||||
this.dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
|
||||
initDispatcherThreads(asyncDispatch);
|
||||
|
||||
// configure asynchronous handler invocation
|
||||
Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class);
|
||||
this.executor = asyncInvocation.getExecutor();
|
||||
}
|
||||
|
||||
// initialize the dispatch workers
|
||||
private void initDispatcherThreads(Feature.AsynchronousMessageDispatch configuration) {
|
||||
for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); i++) {
|
||||
// each thread will run forever and process incoming
|
||||
// message publication requests
|
||||
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
T message = null;
|
||||
while (true) {
|
||||
try {
|
||||
message = AbstractSyncAsyncMessageBus.this.pendingMessages.take();
|
||||
publishMessage(message);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch(Throwable t){
|
||||
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
dispatcher.setName("Message dispatcher");
|
||||
this.dispatchers.add(dispatcher);
|
||||
dispatcher.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// this method queues a message delivery request
|
||||
protected void addAsynchronousPublication(T message) {
|
||||
try {
|
||||
this.pendingMessages.put(message);
|
||||
} catch (InterruptedException e) {
|
||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||
}
|
||||
}
|
||||
|
||||
// this method queues a message delivery request
|
||||
protected void addAsynchronousPublication(T message, long timeout, TimeUnit unit) {
|
||||
try {
|
||||
this.pendingMessages.offer(message, timeout, unit);
|
||||
} catch (InterruptedException e) {
|
||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
super.finalize();
|
||||
shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
for (Thread dispatcher : this.dispatchers) {
|
||||
dispatcher.interrupt();
|
||||
}
|
||||
if(this.executor != null) {
|
||||
this.executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingMessages() {
|
||||
return this.pendingMessages.size() > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
package net.engio.mbassy.bus;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import net.engio.mbassy.IMessageBus;
|
||||
import net.engio.mbassy.bus.config.BusConfiguration;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
|
||||
|
||||
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T> implements IMessageBus<T> {
|
||||
|
||||
public MBassador(IBusConfiguration configuration) {
|
||||
super(configuration);
|
||||
}
|
||||
|
||||
public MBassador(){
|
||||
super(new BusConfiguration()
|
||||
.addFeature(Feature.SyncPubSub.Default())
|
||||
.addFeature(Feature.AsynchronousHandlerInvocation.Default())
|
||||
.addFeature(Feature.AsynchronousMessageDispatch.Default()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
|
||||
* The call blocks until every messageHandler has processed the message.
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
@Override
|
||||
public void publish(T message) {
|
||||
try {
|
||||
publishMessage(message);
|
||||
} catch (Throwable e) {
|
||||
handlePublicationError(new PublicationError()
|
||||
.setMessage("Error during publication of message")
|
||||
.setCause(e)
|
||||
.setPublishedObject(message));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||
* configured queuing strategy:
|
||||
* <p/>
|
||||
* If an unbound queuing strategy is used the call returns immediately.
|
||||
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||
*
|
||||
* @return A message publication that can be used to access information about it's state
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message) {
|
||||
addAsynchronousPublication(message);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||
* configured queuing strategy:
|
||||
* <p/>
|
||||
* If an unbound queuing strategy is used the call returns immediately.
|
||||
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||
* or the timeout is reached.
|
||||
*
|
||||
* @return A message publication that wraps up the publication request
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||
addAsynchronousPublication(message, timeout, unit);
|
||||
}
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
package net.engio.mbassy.bus;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import net.engio.mbassy.IMessageBus;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
|
||||
/**
|
||||
* A message bus implementation that offers only synchronous message publication. Using this bus
|
||||
* will not create any new threads.
|
||||
*
|
||||
*/
|
||||
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
||||
|
||||
|
||||
public SyncMessageBus(IBusConfiguration configuration) {
|
||||
super(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(T message) {
|
||||
try {
|
||||
publishMessage(message);
|
||||
} catch (Throwable e) {
|
||||
handlePublicationError(new PublicationError()
|
||||
.setMessage("Error during publication of message")
|
||||
.setCause(e)
|
||||
.setPublishedObject(message));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishAsync(T message) {
|
||||
publish(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||
publish(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingMessages() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package net.engio.mbassy.bus.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
|
||||
*/
|
||||
public class BusConfiguration implements IBusConfiguration {
|
||||
|
||||
// the registered features
|
||||
private Map<Class<? extends Feature>, Feature> features = new HashMap<Class<? extends Feature>, Feature>();
|
||||
|
||||
public BusConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Feature> T getFeature(Class<T> feature) {
|
||||
return (T)this.features.get(feature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IBusConfiguration addFeature(Feature feature) {
|
||||
this.features.put(feature.getClass(), feature);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
package net.engio.mbassy.bus.config;
|
||||
|
||||
/**
|
||||
* Todo: Add javadoc
|
||||
*
|
||||
* @author bennidi
|
||||
* Date: 8/29/14
|
||||
*/
|
||||
public class ConfigurationError {
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
package net.engio.mbassy.bus.config;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.engio.mbassy.listener.MetadataReader;
|
||||
|
||||
/**
|
||||
* A feature defines the configuration of a specific functionality of a message bus.
|
||||
*
|
||||
* @author bennidi
|
||||
* Date: 8/29/14
|
||||
*/
|
||||
public interface Feature {
|
||||
|
||||
|
||||
class SyncPubSub implements Feature {
|
||||
|
||||
public static final SyncPubSub Default(){
|
||||
return new SyncPubSub()
|
||||
.setMetadataReader(new MetadataReader());
|
||||
}
|
||||
|
||||
private MetadataReader metadataReader;
|
||||
|
||||
|
||||
public MetadataReader getMetadataReader() {
|
||||
return metadataReader;
|
||||
}
|
||||
|
||||
public SyncPubSub setMetadataReader(MetadataReader metadataReader) {
|
||||
this.metadataReader = metadataReader;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
class AsynchronousHandlerInvocation implements Feature {
|
||||
|
||||
protected static final ThreadFactory MessageHandlerThreadFactory = new ThreadFactory() {
|
||||
|
||||
private final AtomicInteger threadID = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = Executors.defaultThreadFactory().newThread(r);
|
||||
thread.setName("AsyncHandler-" + threadID.getAndIncrement());
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
};
|
||||
|
||||
public static final AsynchronousHandlerInvocation Default(){
|
||||
int numberOfCores = Runtime.getRuntime().availableProcessors();
|
||||
return Default(numberOfCores, numberOfCores * 2);
|
||||
}
|
||||
|
||||
public static final AsynchronousHandlerInvocation Default(int initialCoreThreads, int maximumCoreThreads){
|
||||
int numberOfCores = Runtime.getRuntime().availableProcessors();
|
||||
return new AsynchronousHandlerInvocation().setExecutor(new ThreadPoolExecutor(initialCoreThreads, maximumCoreThreads, 1,
|
||||
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), MessageHandlerThreadFactory));
|
||||
}
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
public ExecutorService getExecutor() {
|
||||
return executor;
|
||||
}
|
||||
|
||||
public AsynchronousHandlerInvocation setExecutor(ExecutorService executor) {
|
||||
this.executor = executor;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
class AsynchronousMessageDispatch implements Feature {
|
||||
|
||||
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {
|
||||
|
||||
private final AtomicInteger threadID = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = Executors.defaultThreadFactory().newThread(r);
|
||||
thread.setDaemon(true);// do not prevent the JVM from exiting
|
||||
thread.setName("Dispatcher-" + threadID.getAndIncrement());
|
||||
return thread;
|
||||
}
|
||||
};
|
||||
|
||||
public static final AsynchronousMessageDispatch Default(){
|
||||
return new AsynchronousMessageDispatch()
|
||||
.setNumberOfMessageDispatchers(2)
|
||||
.setDispatcherThreadFactory(MessageDispatchThreadFactory);
|
||||
}
|
||||
|
||||
|
||||
private int numberOfMessageDispatchers;
|
||||
private ThreadFactory dispatcherThreadFactory;
|
||||
|
||||
public int getNumberOfMessageDispatchers() {
|
||||
return numberOfMessageDispatchers;
|
||||
}
|
||||
|
||||
public AsynchronousMessageDispatch setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
|
||||
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public ThreadFactory getDispatcherThreadFactory() {
|
||||
return dispatcherThreadFactory;
|
||||
}
|
||||
|
||||
public AsynchronousMessageDispatch setDispatcherThreadFactory(ThreadFactory dispatcherThreadFactory) {
|
||||
this.dispatcherThreadFactory = dispatcherThreadFactory;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,25 +0,0 @@
|
|||
package net.engio.mbassy.bus.config;
|
||||
|
||||
/**
|
||||
* The configuration of message bus instances is feature driven, e.g. configuration parameters
|
||||
* are grouped into {@link Feature}.
|
||||
*
|
||||
* Features can be added to a bus configuration to be used later in the instantiation process of the message bus.
|
||||
* Each bus will look for the features it requires and configure them according to the provided configuration. If a required feature is not found the bus will publish a {@link ConfigurationError}
|
||||
* to the {@link ConfigurationErrorHandler}
|
||||
*
|
||||
* @author bennidi.
|
||||
*/
|
||||
public interface IBusConfiguration{
|
||||
|
||||
/**
|
||||
* Get a registered feature by its type (class).
|
||||
*
|
||||
* @param feature
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
<T extends Feature> T getFeature(Class<T> feature);
|
||||
|
||||
IBusConfiguration addFeature(Feature feature);
|
||||
}
|
|
@ -10,7 +10,6 @@ package net.engio.mbassy.bus.error;
|
|||
* @author bennidi
|
||||
* Date: 2/22/12
|
||||
*/
|
||||
@SuppressWarnings("PMD.UnusedModifier")
|
||||
public interface IPublicationErrorHandler {
|
||||
|
||||
/**
|
||||
|
|
|
@ -7,6 +7,7 @@ package net.engio.mbassy.bus.error;
|
|||
* Date: 3/29/13
|
||||
*/
|
||||
public class MessageBusException extends Exception {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public MessageBusException() {
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ public class MessageHandler {
|
|||
|
||||
private final boolean isSynchronized;
|
||||
|
||||
|
||||
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){
|
||||
super();
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import net.engio.mbassy.listener.MetadataReader;
|
|||
public class SubscriptionManager {
|
||||
|
||||
// the metadata reader that is used to inspect objects passed to the subscribe method
|
||||
private final MetadataReader metadataReader;
|
||||
private final MetadataReader metadataReader = new MetadataReader();
|
||||
|
||||
// all subscriptions per message type
|
||||
// this is the primary list for dispatching a specific message
|
||||
|
@ -52,8 +52,7 @@ public class SubscriptionManager {
|
|||
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
|
||||
|
||||
public SubscriptionManager(MetadataReader metadataReader) {
|
||||
this.metadataReader = metadataReader;
|
||||
public SubscriptionManager() {
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import org.junit.runners.Suite;
|
|||
WeakConcurrentSetTest.class,
|
||||
MBassadorTest.class,
|
||||
SyncBusTest.MBassadorTest.class,
|
||||
SyncBusTest.SyncMessageBusTest.class,
|
||||
MetadataReaderTest.class,
|
||||
MethodDispatchTest.class,
|
||||
DeadMessageTest.class,
|
||||
|
|
|
@ -18,11 +18,11 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
@Test
|
||||
public void testSingleThreadedSyncFIFO(){
|
||||
// create a fifo bus with 1000 concurrently subscribed listeners
|
||||
IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
|
||||
IMessageBus<Integer> fifoBUs = new MBassador<Integer>();
|
||||
|
||||
List<SyncListener> listeners = new LinkedList<SyncListener>();
|
||||
List<Listener> listeners = new LinkedList<Listener>();
|
||||
for(int i = 0; i < 1000 ; i++){
|
||||
SyncListener listener = new SyncListener();
|
||||
Listener listener = new Listener();
|
||||
listeners.add(listener);
|
||||
fifoBUs.subscribe(listener);
|
||||
}
|
||||
|
@ -34,14 +34,14 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
}
|
||||
// publish in ascending order
|
||||
for(Integer message : messages) {
|
||||
fifoBUs.publishAsync(message);
|
||||
fifoBUs.publish(message);
|
||||
}
|
||||
|
||||
while(fifoBUs.hasPendingMessages()) {
|
||||
pause(1000);
|
||||
}
|
||||
|
||||
for(SyncListener listener : listeners){
|
||||
for(Listener listener : listeners){
|
||||
assertEquals(messages.length, listener.receivedSync.size());
|
||||
for(int i=0; i < messages.length; i++){
|
||||
assertEquals(messages[i], listener.receivedSync.get(i));
|
||||
|
@ -50,15 +50,14 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
|
||||
}
|
||||
|
||||
// NOTE: Can fail due to timing issues.
|
||||
@Test
|
||||
public void testSingleThreadedSyncAsyncFIFO(){
|
||||
// create a fifo bus with 1000 concurrently subscribed listeners
|
||||
IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
|
||||
IMessageBus<Integer> fifoBUs = new MBassador<Integer>(1);
|
||||
|
||||
List<SyncAsyncListener> listeners = new LinkedList<SyncAsyncListener>();
|
||||
List<Listener> listeners = new LinkedList<Listener>();
|
||||
for(int i = 0; i < 1000 ; i++){
|
||||
SyncAsyncListener listener = new SyncAsyncListener();
|
||||
Listener listener = new Listener();
|
||||
listeners.add(listener);
|
||||
fifoBUs.subscribe(listener);
|
||||
}
|
||||
|
@ -69,42 +68,38 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
messages[i] = i;
|
||||
}
|
||||
// publish in ascending order
|
||||
for(Integer message : messages) {
|
||||
for (Integer message : messages) {
|
||||
fifoBUs.publishAsync(message);
|
||||
}
|
||||
|
||||
while(fifoBUs.hasPendingMessages()) {
|
||||
while (fifoBUs.hasPendingMessages()) {
|
||||
pause(2000);
|
||||
}
|
||||
|
||||
for(SyncAsyncListener listener : listeners){
|
||||
assertEquals(messages.length, listener.receivedSync.size());
|
||||
for(int i=0; i < messages.length; i++){
|
||||
assertEquals(messages[i], listener.receivedSync.get(i));
|
||||
for(Listener listener : listeners) {
|
||||
List<Integer> receivedSync = listener.receivedSync;
|
||||
|
||||
synchronized (receivedSync) {
|
||||
assertEquals(messages.length, receivedSync.size());
|
||||
|
||||
for(int i=0; i < messages.length; i++){
|
||||
assertEquals(messages[i], receivedSync.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class SyncListener {
|
||||
public static class Listener {
|
||||
|
||||
private List<Integer> receivedSync = new LinkedList<Integer>();
|
||||
|
||||
@Handler
|
||||
public void handleSync(Integer message){
|
||||
this.receivedSync.add(message);
|
||||
synchronized (this.receivedSync) {
|
||||
this.receivedSync.add(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class SyncAsyncListener {
|
||||
|
||||
private List<Integer> receivedSync = new LinkedList<Integer>();
|
||||
|
||||
@Handler
|
||||
public void handleSync(Integer message){
|
||||
this.receivedSync.add(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import java.util.Set;
|
|||
|
||||
import net.engio.mbassy.annotations.Handler;
|
||||
import net.engio.mbassy.annotations.Synchronized;
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.common.MessageBusTest;
|
||||
|
||||
import org.junit.Test;
|
||||
|
@ -95,7 +94,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
|
|||
|
||||
@Test
|
||||
public void testMetaHandlerFiltering() {
|
||||
MBassador bus = createBus(SyncAsync());
|
||||
MBassador bus = createBus();
|
||||
|
||||
NamedMessageListener listener = new NamedMessageListener();
|
||||
bus.subscribe( listener );
|
||||
|
|
|
@ -4,7 +4,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import net.engio.mbassy.annotations.Handler;
|
||||
import net.engio.mbassy.bus.DeadMessage;
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.common.ConcurrentExecutor;
|
||||
import net.engio.mbassy.common.ListenerFactory;
|
||||
import net.engio.mbassy.common.MessageBusTest;
|
||||
|
@ -33,7 +32,7 @@ public class DeadMessageTest extends MessageBusTest{
|
|||
|
||||
@Test
|
||||
public void testDeadMessage(){
|
||||
final MBassador bus = createBus(SyncAsync());
|
||||
final MBassador bus = createBus();
|
||||
ListenerFactory listeners = new ListenerFactory()
|
||||
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
|
||||
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
|
||||
|
@ -69,7 +68,7 @@ public class DeadMessageTest extends MessageBusTest{
|
|||
|
||||
@Test
|
||||
public void testUnsubscribingAllListeners() {
|
||||
final MBassador bus = createBus(SyncAsync());
|
||||
final MBassador bus = createBus();
|
||||
ListenerFactory deadMessageListener = new ListenerFactory()
|
||||
.create(InstancesPerListener, DeadMessagHandler.class)
|
||||
.create(InstancesPerListener, Object.class);
|
||||
|
|
|
@ -2,7 +2,6 @@ package net.engio.mbassy;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
import net.engio.mbassy.common.ConcurrentExecutor;
|
||||
|
@ -35,7 +34,7 @@ public class MBassadorTest extends MessageBusTest {
|
|||
ListenerFactory listeners = new ListenerFactory()
|
||||
.create(InstancesPerListener, Listeners.synchronous())
|
||||
.create(InstancesPerListener, Listeners.noHandlers());
|
||||
final MBassador bus = createBus(SyncAsync(), listeners);
|
||||
final MBassador bus = createBus(listeners);
|
||||
|
||||
|
||||
Runnable publishAndCheck = new Runnable() {
|
||||
|
@ -69,7 +68,7 @@ public class MBassadorTest extends MessageBusTest {
|
|||
|
||||
ListenerFactory listeners = new ListenerFactory()
|
||||
.create(InstancesPerListener, Listeners.noHandlers());
|
||||
final MBassador bus = createBus(SyncAsync(), listeners);
|
||||
final MBassador bus = createBus(listeners);
|
||||
|
||||
|
||||
final MessageManager messageManager = new MessageManager();
|
||||
|
@ -103,7 +102,7 @@ public class MBassadorTest extends MessageBusTest {
|
|||
}
|
||||
};
|
||||
|
||||
final MBassador bus = new MBassador(SyncAsync());
|
||||
final MBassador bus = new MBassador();
|
||||
bus.addErrorHandler(ExceptionCounter);
|
||||
ListenerFactory listeners = new ListenerFactory()
|
||||
.create(InstancesPerListener, ExceptionThrowingListener.class);
|
||||
|
|
|
@ -41,7 +41,7 @@ public class MethodDispatchTest extends MessageBusTest{
|
|||
|
||||
@Test
|
||||
public void testDispatch1(){
|
||||
IMessageBus bus = createBus(SyncAsync());
|
||||
IMessageBus bus = createBus();
|
||||
EventListener2 listener2 = new EventListener2();
|
||||
bus.subscribe(listener2);
|
||||
bus.publish("jfndf");
|
||||
|
|
|
@ -5,7 +5,6 @@ import net.engio.mbassy.common.ConcurrentExecutor;
|
|||
import net.engio.mbassy.common.ListenerFactory;
|
||||
import net.engio.mbassy.common.SubscriptionValidator;
|
||||
import net.engio.mbassy.common.TestUtil;
|
||||
import net.engio.mbassy.listener.MetadataReader;
|
||||
import net.engio.mbassy.listeners.AbstractMessageListener;
|
||||
import net.engio.mbassy.listeners.ICountableListener;
|
||||
import net.engio.mbassy.listeners.IMessageListener;
|
||||
|
@ -163,7 +162,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||
Overloading.ListenerBase.class,
|
||||
Overloading.ListenerSub.class);
|
||||
|
||||
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader());
|
||||
SubscriptionManager subscriptionManager = new SubscriptionManager();
|
||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
||||
|
||||
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
|
||||
|
@ -182,7 +181,7 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||
}
|
||||
|
||||
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
|
||||
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader());
|
||||
final SubscriptionManager subscriptionManager = new SubscriptionManager();
|
||||
|
||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package net.engio.mbassy;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
import net.engio.mbassy.common.ConcurrentExecutor;
|
||||
|
@ -119,17 +118,6 @@ public abstract class SyncBusTest extends MessageBusTest {
|
|||
|
||||
}
|
||||
|
||||
public static class SyncMessageBusTest extends SyncBusTest {
|
||||
|
||||
|
||||
@Override
|
||||
protected IMessageBus getSyncMessageBus() {
|
||||
return BusFactory.SynchronousOnly();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
static class IncrementingMessage{
|
||||
|
||||
private int count = 1;
|
||||
|
|
|
@ -6,8 +6,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import net.engio.mbassy.annotations.Handler;
|
||||
import net.engio.mbassy.annotations.Synchronized;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.common.MessageBusTest;
|
||||
|
||||
import org.junit.Test;
|
||||
|
@ -27,10 +25,8 @@ public class SynchronizedHandlerTest extends MessageBusTest {
|
|||
@Test
|
||||
public void testSynchronizedWithSynchronousInvocation(){
|
||||
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
|
||||
IBusConfiguration config = SyncAsync();
|
||||
config.getFeature(Feature.AsynchronousMessageDispatch.class).setNumberOfMessageDispatchers(6);
|
||||
|
||||
IMessageBus bus = createBus(config);
|
||||
IMessageBus bus = createBus();
|
||||
for(int i = 0; i < numberOfListeners; i++){
|
||||
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
|
||||
handlers.add(handler);
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
package net.engio.mbassy.common;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.bus.config.BusConfiguration;
|
||||
import net.engio.mbassy.bus.config.Feature;
|
||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||
import net.engio.mbassy.MBassador;
|
||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||
import net.engio.mbassy.bus.error.PublicationError;
|
||||
import net.engio.mbassy.messages.MessageTypes;
|
||||
|
@ -45,21 +42,15 @@ public abstract class MessageBusTest extends AssertSupport {
|
|||
}
|
||||
}
|
||||
|
||||
public static IBusConfiguration SyncAsync() {
|
||||
return new BusConfiguration()
|
||||
.addFeature(Feature.SyncPubSub.Default())
|
||||
.addFeature(Feature.AsynchronousHandlerInvocation.Default())
|
||||
.addFeature(Feature.AsynchronousMessageDispatch.Default());
|
||||
}
|
||||
|
||||
public MBassador createBus(IBusConfiguration configuration) {
|
||||
MBassador bus = new MBassador(configuration);
|
||||
public MBassador createBus() {
|
||||
MBassador bus = new MBassador();
|
||||
bus.addErrorHandler(TestFailingHandler);
|
||||
return bus;
|
||||
}
|
||||
|
||||
public MBassador createBus(IBusConfiguration configuration, ListenerFactory listeners) {
|
||||
MBassador bus = new MBassador(configuration);
|
||||
public MBassador createBus(ListenerFactory listeners) {
|
||||
MBassador bus = new MBassador();
|
||||
bus.addErrorHandler(TestFailingHandler);
|
||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
|
||||
return bus;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package net.engio.mbassy.common;
|
||||
|
||||
import net.engio.mbassy.MBassador;
|
||||
import net.engio.mbassy.PubSubSupport;
|
||||
import net.engio.mbassy.bus.MBassador;
|
||||
import net.engio.mbassy.subscription.SubscriptionManager;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
|
Loading…
Reference in New Issue
Block a user