fixed #35 #44 addressed #45, major refactorings, added documentation and tests

This commit is contained in:
benjamin 2013-09-02 17:49:06 +02:00
parent 8aa3fca57f
commit d6aa291b86
52 changed files with 756 additions and 424 deletions

View File

@ -18,7 +18,7 @@
while preserving resource efficiency and performance.
It features:
declarative listener definition via annotations,
declarative handler definition via annotations,
sync and/or async message delivery,
weak-references,
message filtering,

View File

@ -12,34 +12,35 @@ import java.lang.reflect.Method;
* Date: 2/22/12
* Time: 4:59 PM
*/
public class PublicationError {
public class PublicationError{
// Internal state
private Throwable cause;
private String message;
private Method listener;
private Object listeningObject;
private Method handler;
private Object listener;
private Object publishedObject;
/**
* Compound constructor, creating a PublicationError from the supplied objects.
*
* @param cause The Throwable giving rise to this PublicationError.
* @param message The message to send.
* @param listener The method where the error was created.
* @param listeningObject The object in which the PublicationError was generated.
* @param handler The method where the error was created.
* @param listener The object in which the PublicationError was generated.
* @param publishedObject The published object which gave rise to the error.
*/
public PublicationError(final Throwable cause,
final String message,
final Method listener,
final Object listeningObject,
final Method handler,
final Object listener,
final Object publishedObject) {
this.cause = cause;
this.message = message;
this.handler = handler;
this.listener = listener;
this.listeningObject = listeningObject;
this.publishedObject = publishedObject;
}
@ -77,21 +78,21 @@ public class PublicationError {
return this;
}
public Method getListener() {
return listener;
public Method getHandler() {
return handler;
}
public PublicationError setListener(Method listener) {
this.listener = listener;
public PublicationError setHandler(Method handler) {
this.handler = handler;
return this;
}
public Object getListeningObject() {
return listeningObject;
public Object getListener() {
return listener;
}
public PublicationError setListeningObject(Object listeningObject) {
this.listeningObject = listeningObject;
public PublicationError setListener(Object listener) {
this.listener = listener;
return this;
}
@ -116,9 +117,9 @@ public class PublicationError {
newLine +
"\tmessage='" + message + '\'' +
newLine +
"\tlistener=" + listener +
"\thandler=" + handler +
newLine +
"\tlisteningObject=" + listeningObject +
"\tlistener=" + listener +
newLine +
"\tpublishedObject=" + publishedObject +
'}';

View File

@ -1,11 +1,15 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The base class for all async message bus implementations.
@ -13,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @param <T>
* @param <P>
*/
public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPostCommand> extends AbstractSyncMessageBus<T, P> implements IMessageBus<T, P> {
public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand> extends AbstractSyncMessageBus<T, P> implements IMessageBus<T, P> {
// executor for asynchronous message handlers
private final ExecutorService executor;
@ -24,23 +28,23 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication> pendingMessages;
private static final AtomicInteger threadID = new AtomicInteger();
public AbstractSyncAsyncMessageBus(BusConfiguration configuration) {
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
super(configuration);
this.executor = configuration.getExecutor();
pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
this.executor = configuration.getExecutorForAsynchronousHandlers();
getRuntime().add("handler.async-service", executor);
pendingMessages = configuration.getPendingMessagesQueue();
dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
initDispatcherThreads(configuration);
}
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
private void initDispatcherThreads(IBusConfiguration configuration) {
for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
// message publication requests
Thread dispatcher = configuration.getThreadFactoryForAsynchronousMessageDispatch().newThread(new Runnable() {
public void run() {
while (true) {
try {
@ -54,8 +58,6 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
}
}
});
dispatcher.setDaemon(true); // do not prevent the JVM from exiting
dispatcher.setName("MBassyDispatch-" + threadID.incrementAndGet());
dispatchers.add(dispatcher);
dispatcher.start();
}
@ -68,7 +70,8 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
pendingMessages.put(request);
return request.markScheduled();
} catch (InterruptedException e) {
return request.setError();
// TODO: publication error
return request;
}
}
@ -77,9 +80,10 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
try {
return pendingMessages.offer(request, timeout, unit)
? request.markScheduled()
: request.setError();
: request;
} catch (InterruptedException e) {
return request.setError();
// TODO: publication error
return request;
}
}
@ -89,6 +93,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
super.finalize();
}
@Override
public void shutdown() {
for (Thread dispatcher : dispatchers) {
dispatcher.interrupt();
@ -96,6 +101,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
if(executor != null) executor.shutdown();
}
@Override
public boolean hasPendingMessages() {
return pendingMessages.size() > 0;
}

View File

@ -2,16 +2,16 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.publication.IPublicationCommand;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.*;
/**
* The base class for all message bus implementations.
@ -19,7 +19,7 @@ import java.util.*;
* @param <T>
* @param <P>
*/
public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> implements ISyncMessageBus<T, P> {
public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> implements ISyncMessageBus<T, P>{
// this handler will receive all errors that occur during message dispatch or message handling
@ -29,10 +29,14 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
private final SubscriptionManager subscriptionManager;
private final BusRuntime runtime;
public AbstractSyncMessageBus(SyncBusConfiguration configuration) {
public AbstractSyncMessageBus(IBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory().setBus(this));
configuration.getSubscriptionFactory(), runtime);
this.publicationFactory = configuration.getMessagePublicationFactory();
}
@ -61,15 +65,19 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
}
}
@Override
public BusRuntime getRuntime() {
return runtime;
}
protected MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
return getPublicationFactory().createPublication(runtime, subscriptions, new DeadMessage(message));
} else {
return getPublicationFactory().createPublication(this, subscriptions, message);
return getPublicationFactory().createPublication(runtime, subscriptions, message);
}
}

View File

@ -1,72 +0,0 @@
package net.engio.mbassy.bus;
import java.util.concurrent.*;
/**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
*
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration extends SyncBusConfiguration<BusConfiguration> {
private static final ThreadFactory DaemonThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);
return thread;
}
};
public static BusConfiguration Default() {
return new BusConfiguration();
}
private int numberOfMessageDispatchers;
private ExecutorService executor;
private int maximumNumberOfPendingMessages;
public BusConfiguration() {
super();
this.numberOfMessageDispatchers = 2;
this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
}
public int getNumberOfMessageDispatchers() {
return numberOfMessageDispatchers > 0 ? numberOfMessageDispatchers : 2;
}
public BusConfiguration setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
return this;
}
/**
* By default an unbound queuing strategy is used to ensure that no events get lost
* @return
*/
public ExecutorService getExecutor() {
return executor;
}
public BusConfiguration setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}
public int getMaximumNumberOfPendingMessages() {
return maximumNumberOfPendingMessages;
}
public BusConfiguration setMaximumNumberOfPendingMessages(int maximumNumberOfPendingMessages) {
this.maximumNumberOfPendingMessages = maximumNumberOfPendingMessages > 0
? maximumNumberOfPendingMessages
: Integer.MAX_VALUE;
return this;
}
}

View File

@ -0,0 +1,51 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.common.MissingPropertyException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Message bus implementations potentially vary in the features they provide and consequently in the components and properties
* they expose. The runtime is a container for all those dynamic properties and components and is meant to be passed around
* between collaborating objects such that they may access the different functionality provided by the bus implementation
* they all belong to.
*
* It is the responsibility of the bus implementation to create and configure the runtime according to its capabilities,
*
*/
public class BusRuntime {
private PubSubSupport provider;
private Map<String, Object> properties = new HashMap<String, Object>();
public BusRuntime(PubSubSupport provider) {
this.provider = provider;
}
public <T> T get(String key){
if(!contains(key))
throw new MissingPropertyException("The property " + key + " is not available in this runtime");
else return (T) properties.get(key);
}
public PubSubSupport getProvider(){
return provider;
}
public Collection<String> getKeys(){
return properties.keySet();
}
public BusRuntime add(String key, Object property){
properties.put(key, property);
return this;
}
public boolean contains(String key){
return properties.containsKey(key);
}
}

View File

@ -0,0 +1,27 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import java.util.Collection;
public interface ErrorHandlingSupport {
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*
* @param errorHandler
*/
void addErrorHandler(IPublicationErrorHandler errorHandler);
/**
* Returns an immutable collection containing all the registered error handlers
*
* @return
*/
Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
}

View File

@ -0,0 +1,25 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
* This interface is meant to be implemented by different bus implementations to offer a consistent way
* to plugin different flavors of message publication.
*
* The parametrization of the IPostCommand influences which publication flavours are available.
*
*/
public interface GenericMessagePublicationSupport<T, P extends IPublicationCommand> {
/**
* Publish a message to the bus using on of its supported message publication mechanisms. The supported
* mechanisms depend on the available implementation and are exposed as subclasses of IPublicationCommand.
* The standard mechanism is the synchronous dispatch which will publish the message in the current thread
* and returns after every matching handler has been invoked. @See IPublicationCommand.
*
* @param message
* @return
*/
P post(T message);
}

View File

@ -0,0 +1,19 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
import java.util.concurrent.TimeUnit;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 8/21/13
* Time: 11:05 AM
* To change this template use File | Settings | File Templates.
*/
public interface IMBassador<T> extends IMessageBus<T, SyncAsyncPostCommand<T>> {
MessagePublication publishAsync(T message);
MessagePublication publishAsync(T message, long timeout, TimeUnit unit);
}

View File

@ -1,42 +1,55 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for per message handler and message publication.
* A message publication is the publication of any message using one of the bus' publication methods.
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.
* A message publication starts when an object is send to the bus using one of the its publication methods.
*
* Messages can be published synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* Message handlers can be invoked synchronously or asynchronously depending on their configuration. Thus, there
* are two notions of synchronicity / asynchronicity. One on the caller side, e.g. the invocation of the message publishing
* methods. The second on the handler side, e.g. whether the handler is invoked in the same or a different thread.
*
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously.
* Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* in a synchronized fashion using @Synchronized annotation
*
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Handler annotation.
*
* <p/>
* The bus uses weak references to all listeners such that registered listeners do not need to
* By default, the bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched.
* removed on-the-fly as messages get dispatched. This can be changed using the @Listener annotation.
*
* <p/>
* Generally message handlers will be invoked in inverse sequence of subscription but any
* client using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the subscribed message handlers.
* a specific message exactly once to each of the respective message handlers.
*
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
* a message handler may define filters to narrow the set of messages that it accepts.
*
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any message listener may only be subscribed once -> subsequent subscriptions of an already subscribed message listener
* will be silently ignored)
*
* <p/>
* Removing a listener (unsubscribing) means removing all subscribed message handlers of that listener. This remove operation
* immediately takes effect and on all running dispatch processes -> A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
* after the remove operation completed.
*
* <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
@ -44,7 +57,13 @@ import java.util.concurrent.TimeUnit;
* @Author bennidi
* Date: 2/8/12
*/
public interface IMessageBus<T, P extends IMessageBus.IPostCommand> extends ISyncMessageBus<T,P> {
public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P> {
/**
* {@inheritDoc}
*/
@Override
P post(T message);
/**
* Get the executor service that is used for asynchronous message publications.
@ -68,37 +87,5 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> extends ISyn
*/
void shutdown();
/**
* @param message
* @return
*/
P post(T message);
interface IPostCommand extends ISyncPostCommand {
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*
* @return A message publication that can be used to access information about the state of
*/
MessagePublication asynchronously();
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/
MessagePublication asynchronously(long timeout, TimeUnit unit);
}
}

View File

@ -1,55 +1,12 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PubSubSupport;
import java.util.Collection;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
*
*
* @author bennidi
* Date: 3/29/13
*/
public interface ISyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> extends PubSubSupport<T>{
public interface ISyncMessageBus<T, P extends IPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P>{
/**
* @param message
* @return
*/
P post(T message);
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*
* @param errorHandler
*/
void addErrorHandler(IPublicationErrorHandler errorHandler);
/**
* Returns an immutable collection containing all the registered error handlers
*
* @return
*/
Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
/**
* A post command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the functionality provided by the message bus that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*/
interface ISyncPostCommand {
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
void now();
}
}

View File

@ -1,21 +1,25 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
import java.util.concurrent.TimeUnit;
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> {
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> implements IMBassador<T> {
public MBassador(BusConfiguration configuration) {
super(configuration);
}
@Override
public MessagePublication publishAsync(T message) {
return addAsynchronousDeliveryRequest(createMessagePublication(message));
}
@Override
public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit);
}

View File

@ -19,26 +19,15 @@ import java.util.Collection;
*/
public class MessagePublication {
public static class Factory {
private final Collection<Subscription> subscriptions;
private final Object message;
// message publications can be referenced by multiple threads to query publication progress
private volatile State state = State.Initial;
private volatile boolean delivered = false;
private final BusRuntime runtime;
public MessagePublication createPublication(ISyncMessageBus owningBus, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(owningBus, subscriptions, message, State.Initial);
}
}
private Collection<Subscription> subscriptions;
private Object message;
private State state = State.Initial;
private boolean delivered = false;
private ISyncMessageBus bus;
protected MessagePublication(ISyncMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
this.bus = bus;
protected MessagePublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message, State initialState) {
this.runtime = runtime;
this.subscriptions = subscriptions;
this.message = message;
this.state = initialState;
@ -51,15 +40,15 @@ public class MessagePublication {
protected void execute() {
state = State.Running;
for (Subscription sub : subscriptions) {
sub.publish(this, message);
sub.publish(this, message);
}
state = State.Finished;
// if the message has not been marked delivered by the dispatcher
if (!delivered) {
if (!isFilteredEvent() && !isDeadEvent()) {
bus.post(new FilteredMessage(message)).now();
runtime.getProvider().publish(new FilteredMessage(message));
} else if (!isDeadEvent()) {
bus.post(new DeadMessage(message)).now();
runtime.getProvider().publish(new DeadMessage(message));
}
}
@ -82,17 +71,12 @@ public class MessagePublication {
}
public MessagePublication markScheduled() {
if (!state.equals(State.Initial)) {
return this;
if (state.equals(State.Initial)) {
state = State.Scheduled;
}
state = State.Scheduled;
return this;
}
public MessagePublication setError() {
state = State.Error;
return this;
}
public boolean isDeadEvent() {
return DeadMessage.class.isAssignableFrom(message.getClass());
@ -106,4 +90,12 @@ public class MessagePublication {
Initial, Scheduled, Running, Finished, Error
}
public static class Factory {
public MessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(runtime, subscriptions, message, State.Initial);
}
}
}

View File

@ -1,9 +0,0 @@
package net.engio.mbassy.bus;
/**
*
* @author bennidi
* Date: 5/25/13
*/
public interface PostCommand {
}

View File

@ -1,17 +1,16 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
/**
* This interface defines the very basic message publication semantics according to the publish subscribe pattern.
*
* Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed
*
* Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its
* handlers will be registered and start to receive matching message publications.
*
*/
public interface PubSubSupport<T> {
public interface PubSubSupport<T> extends RuntimeProvider{
/**
* Subscribe all handler of the given listener. Any listener may only be subscribed once
* -> subsequent subscriptions of an already subscribed listener will be silently ignored)
* Subscribe all handlers of the given listener. Any listener is only subscribed once
* -> subsequent subscriptions of an already subscribed listener will be silently ignored
*
* @param listener
*/
@ -19,8 +18,8 @@ public interface PubSubSupport<T> {
/**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the message listener was still subscribed).
* have effectively been removed and will not receive any messages (provided that running publications (iterators) in other threads
* have not yet obtained a reference to the listener)
* <p/>
* A call to this method passing any object that is not subscribed will not have any effect and is silently ignored.
*
@ -32,8 +31,9 @@ public interface PubSubSupport<T> {
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message.
* Synchronously publish a message to all registered listeners. This includes listeners defined for super types of the
* given message type, provided they are not configured to reject valid subtype. The call returns when all matching handlers
* of all registered listeners have been notified (invoked) of the message.
*
* @param message
*/

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.bus;
/**
* Each message bus provides a runtime object to access its dynamic features and runtime configuration.
*/
public interface RuntimeProvider {
BusRuntime getRuntime();
}

View File

@ -1,6 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
* Created with IntelliJ IDEA.
@ -12,7 +14,7 @@ import net.engio.mbassy.PublicationError;
public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.SyncPostCommand>{
public SyncMessageBus(SyncBusConfiguration configuration) {
public SyncMessageBus(IBusConfiguration configuration) {
super(configuration);
}
@ -40,7 +42,7 @@ public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.
return new SyncPostCommand(message);
}
public class SyncPostCommand implements ISyncMessageBus.ISyncPostCommand{
public class SyncPostCommand implements IPublicationCommand {
private T message;

View File

@ -0,0 +1,143 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
*
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration implements IBusConfiguration {
protected static final ThreadFactory AsynchronousHandlerThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("AsyncHandler-" + threadID.getAndIncrement());
thread.setDaemon(true);
return thread;
}
};
protected static final ThreadFactory DispatcherThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);// do not prevent the JVM from exiting
thread.setName("Dispatcher-" + threadID.getAndIncrement());
return thread;
}
};
public static BusConfiguration Default() {
BusConfiguration defaultConfig = new BusConfiguration();
int numberOfCoreThreads = Runtime.getRuntime().availableProcessors();
defaultConfig.setExecutorForAsynchronousHandlers(new ThreadPoolExecutor(numberOfCoreThreads, numberOfCoreThreads*2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), AsynchronousHandlerThreadFactory));
defaultConfig.setMetadataReader(new MetadataReader());
defaultConfig.setSubscriptionFactory(new SubscriptionFactory());
defaultConfig.setNumberOfMessageDispatchers(2);
defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory());
defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory);
return defaultConfig;
}
public static BusConfiguration Empty(){
return new BusConfiguration();
}
protected int numberOfMessageDispatchers;
protected ExecutorService executor;
protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
protected ThreadFactory dispatcherThreadFactory;
public void setPendingMessagesQueue(BlockingQueue<MessagePublication> pendingMessagesQueue) {
this.pendingMessagesQueue = pendingMessagesQueue;
}
protected BlockingQueue<MessagePublication> pendingMessagesQueue;
private BusConfiguration() {
super();
}
@Override
public int getNumberOfMessageDispatchers() {
return numberOfMessageDispatchers > 0 ? numberOfMessageDispatchers : 2;
}
public BusConfiguration setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
return this;
}
@Override
public ExecutorService getExecutorForAsynchronousHandlers() {
return executor;
}
@Override
public BlockingQueue<MessagePublication> getPendingMessagesQueue() {
return new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE);
}
@Override
public ThreadFactory getThreadFactoryForAsynchronousMessageDispatch() {
return dispatcherThreadFactory;
}
public BusConfiguration setThreadFactoryForAsynchronousMessageDispatch(ThreadFactory factory) {
dispatcherThreadFactory = factory;
return this;
}
public BusConfiguration setExecutorForAsynchronousHandlers(ExecutorService executor) {
this.executor = executor;
return this;
}
@Override
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public BusConfiguration setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
return this;
}
@Override
public MetadataReader getMetadataReader() {
return metadataReader;
}
public void setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
}
@Override
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public BusConfiguration setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
}

View File

@ -0,0 +1,34 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 8/16/13
* Time: 9:56 AM
* To change this template use File | Settings | File Templates.
*/
public interface IBusConfiguration {
int getNumberOfMessageDispatchers();
ExecutorService getExecutorForAsynchronousHandlers();
BlockingQueue<MessagePublication> getPendingMessagesQueue();
MessagePublication.Factory getMessagePublicationFactory();
MetadataReader getMetadataReader();
SubscriptionFactory getSubscriptionFactory();
ThreadFactory getThreadFactoryForAsynchronousMessageDispatch();
}

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;

View File

@ -0,0 +1,15 @@
package net.engio.mbassy.bus.publication;
/**
* A publication command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the message publication flavors provided by the message bus implementation that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*/
public interface IPublicationCommand {
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
void now();
}

View File

@ -0,0 +1,35 @@
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.MessagePublication;
import java.util.concurrent.TimeUnit;
/**
*
*
*/
public interface ISyncAsyncPublicationCommand extends IPublicationCommand {
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*
* @return A message publication that can be used to access information about the state of
*/
MessagePublication asynchronously();
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/
MessagePublication asynchronously(long timeout, TimeUnit unit);
}

View File

@ -1,4 +1,7 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import java.util.concurrent.TimeUnit;
@ -8,7 +11,7 @@ import java.util.concurrent.TimeUnit;
* @author bennidi
* Date: 11/12/12
*/
public class SyncAsyncPostCommand<T> implements IMessageBus.IPostCommand {
public class SyncAsyncPostCommand<T> implements ISyncAsyncPublicationCommand {
private T message;
private MBassador<T> mBassador;

View File

@ -0,0 +1,14 @@
package net.engio.mbassy.common;
/**
* This exception is thrown when a property value that is unavailable at runtime is accessed.
* It indicates that some parts of the runtime configuration are actually incompatible,
* i.e. one component is trying to access a feature of another component that does not
* provide the feature (e.g. asynchronous functionality within a synchronous bus)
*/
public class MissingPropertyException extends RuntimeException{
public MissingPropertyException(String message) {
super(message);
}
}

View File

@ -1,32 +1,36 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import java.util.concurrent.ExecutorService;
/**
* This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously
*
* @author bennidi
* Date: 11/23/12
*/
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware<IMessageBus> implements IHandlerInvocation<Object,Object,IMessageBus> {
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
private IHandlerInvocation delegate;
private final IHandlerInvocation delegate;
private final ExecutorService executor;
public AsynchronousHandlerInvocation(IHandlerInvocation delegate) {
super(delegate.getContext());
this.delegate = delegate;
this.executor = delegate.getContext().getRuntime().get("handler.async-service");
}
/**
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message) {
getContext().getOwningBus().getExecutor().execute(new Runnable() {
public void invoke(final Object listener, final Object message){
executor.execute(new Runnable() {
@Override
public void run() {
delegate.invoke(listener, message);
delegate.invoke(listener, message);
}
});
}

View File

@ -20,7 +20,7 @@ public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher {
}
@Override
public void dispatch(MessagePublication publication, Object message, Iterable listeners) {
public void dispatch(MessagePublication publication, Object message, Iterable listeners){
getDelegate().dispatch(publication, new MessageEnvelope(message), listeners);
}
}

View File

@ -36,7 +36,7 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
@Override
public void dispatch(MessagePublication publication, Object message, Iterable listeners) {
public void dispatch(MessagePublication publication, Object message, Iterable listeners){
if (passesFilter(message)) {
getDelegate().dispatch(publication, message, listeners);
}

View File

@ -1,19 +1,30 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public abstract class HandlerInvocation<Listener, Message> extends AbstractSubscriptionContextAware<ISyncMessageBus> implements IHandlerInvocation<Listener, Message,ISyncMessageBus>{
public abstract class HandlerInvocation<HANDLER, MESSAGE> extends AbstractSubscriptionContextAware implements IHandlerInvocation<HANDLER, MESSAGE>{
private final Collection<IPublicationErrorHandler> errorHandlers;
public HandlerInvocation(SubscriptionContext context) {
super(context);
errorHandlers = context.getErrorHandlers();
}
protected void handlePublicationError(PublicationError error){
for(IPublicationErrorHandler handler : errorHandlers)
handler.handleError(error);
}
}

View File

@ -1,23 +1,29 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.subscription.ISubscriptionContextAware;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
* message handler to process a given message.
*
* A handler invocation might come in different flavours and can be composed
* of various independent invocations be means of delegation (decorator pattern)
* of various independent invocations by means of delegation (-> decorator pattern)
*
* If an exception is thrown during handler invocation it is wrapped and propagated
* as a publication error
*
* @author bennidi
* Date: 11/23/12
*/
public interface IHandlerInvocation<Listener, Message, Bus extends ISyncMessageBus> extends ISubscriptionContextAware<Bus> {
public interface IHandlerInvocation<HANDLER, MESSAGE> extends ISubscriptionContextAware {
/**
* Invoke the message delivery logic of this handler
*
* @param listener The listener that will receive the message
* @param message The message to be delivered to the listener
* @param handler The listener that will receive the message. This can be a reference to a method object
* from the java reflection api or any other wrapper that can be used to invoke the handler
* @param message The message to be delivered to the handler. This can be any object compatible with the object
* type that the handler consumes
*/
void invoke(Listener listener, Message message);
void invoke(HANDLER handler, MESSAGE message);
}

View File

@ -1,14 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
/**
* This interface marks components that have access to the message bus that they belong to.
*
* @author bennidi
* Date: 3/1/13
*/
public interface IMessageBusAware<Bus extends ISyncMessageBus> {
Bus getBus();
}

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.subscription.ISubscriptionContextAware;
/**
* A message dispatcher provides the functionality to deliver a single message

View File

@ -24,7 +24,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
}
@Override
public void dispatch(final MessagePublication publication, final Object message, final Iterable listeners) {
public void dispatch(final MessagePublication publication, final Object message, final Iterable listeners){
publication.markDelivered();
for (Object listener : listeners) {
getInvocation().invoke(listener, message);

View File

@ -1,12 +1,10 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Uses reflection to invoke a message handler for a given message.
@ -20,36 +18,25 @@ public class ReflectiveHandlerInvocation extends HandlerInvocation{
super(context);
}
protected void handlePublicationError(PublicationError error) {
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
for (IPublicationErrorHandler handler : handlers) {
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener, Method handler) {
protected void invokeHandler(final Object message, final Object listener, Method handler){
try {
handler.invoke(listener, message);
} catch (IllegalAccessException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"The class or method is not accessible",
handler, listener, message));
} catch (IllegalArgumentException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
} catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"Message handler threw exception",
handler, listener, message));
} catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"The handler code threw an exception",
handler, listener, message));
}
}
@ -58,7 +45,7 @@ public class ReflectiveHandlerInvocation extends HandlerInvocation{
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message) {
public void invoke(final Object listener, final Object message){
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());
}
}

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
@ -9,7 +8,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
* @author bennidi
* Date: 3/31/13
*/
public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware<IMessageBus> implements IHandlerInvocation<Object,Object,IMessageBus> {
public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation<Object,Object> {
private IHandlerInvocation delegate;
@ -22,7 +21,7 @@ public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAw
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message) {
public void invoke(final Object listener, final Object message){
synchronized (listener){
delegate.invoke(listener, message);
}

View File

@ -1,28 +1,21 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.dispatch.ISubscriptionContextAware;
/**
* The base implementation for subscription context aware objects (mightily obvious :)
*
* @author bennidi
* Date: 3/1/13
*/
public class AbstractSubscriptionContextAware<Bus extends ISyncMessageBus> implements ISubscriptionContextAware<Bus> {
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware {
private final SubscriptionContext<Bus> context;
private final SubscriptionContext context;
public AbstractSubscriptionContextAware(SubscriptionContext<Bus> context) {
public AbstractSubscriptionContextAware(SubscriptionContext context) {
this.context = context;
}
public SubscriptionContext<Bus> getContext() {
public SubscriptionContext getContext() {
return context;
}
@Override
public Bus getBus() {
return context.getOwningBus();
}
}

View File

@ -1,7 +1,4 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.subscription.SubscriptionContext;
package net.engio.mbassy.subscription;
/**
* This interface marks components that have access to the subscription context.
@ -9,7 +6,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
* @author bennidi
* Date: 3/1/13
*/
public interface ISubscriptionContextAware<Bus extends ISyncMessageBus> extends IMessageBusAware<Bus> {
public interface ISubscriptionContextAware{
/**
* Get the subscription context associated with this object

View File

@ -44,8 +44,9 @@ public class Subscription {
}
public void publish(MessagePublication publication, Object message) {
dispatcher.dispatch(publication, message, listeners);
public void publish(MessagePublication publication, Object message){
if(listeners.size() > 0)
dispatcher.dispatch(publication, message, listeners);
}
public int getPriority() {

View File

@ -1,8 +1,12 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.RuntimeProvider;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import java.util.Collection;
/**
* The subscription context holds all (meta)data/objects that are relevant to successfully publish
* a message within a subscription. A one-to-one relation between a subscription and
@ -12,27 +16,23 @@ import net.engio.mbassy.listener.MessageHandlerMetadata;
* @author bennidi
* Date: 11/23/12
*/
public class SubscriptionContext<Bus extends ISyncMessageBus> {
public class SubscriptionContext implements RuntimeProvider {
private Bus owningBus;
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandlerMetadata handlerMetadata;
private MessageHandlerMetadata handlerMetadata;
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
public SubscriptionContext(Bus owningBus, MessageHandlerMetadata handlerMetadata) {
this.owningBus = owningBus;
private BusRuntime runtime;
public SubscriptionContext(BusRuntime runtime, MessageHandlerMetadata handlerMetadata,
Collection<IPublicationErrorHandler> errorHandlers) {
this.runtime = runtime;
this.handlerMetadata = handlerMetadata;
this.errorHandlers = errorHandlers;
}
/**
* Get a reference to the message bus this context belongs to
*
* @return
*/
public Bus getOwningBus() {
return owningBus;
}
/**
* Get the meta data that specifies the characteristics of the message handler
* that is associated with this context
@ -43,4 +43,17 @@ public class SubscriptionContext<Bus extends ISyncMessageBus> {
return handlerMetadata;
}
/**
* Get the error handlers registered with the enclosing bus.
* @return
*/
public Collection<IPublicationErrorHandler> getErrorHandlers(){
return errorHandlers;
}
@Override
public BusRuntime getRuntime() {
return runtime;
}
}

View File

@ -1,15 +1,16 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.MessageBusException;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.util.Collection;
/**
* The subscription factory is used to create an empty subscription for specific message handler.
@ -17,16 +18,12 @@ import java.lang.reflect.Modifier;
*/
public class SubscriptionFactory {
private ISyncMessageBus bus;
private static final String ErrorHandlers = "error.handlers";
public SubscriptionFactory setBus(ISyncMessageBus bus) {
this.bus = bus;
return this;
}
public Subscription createSubscription(MessageHandlerMetadata handlerMetadata) throws MessageBusException{
public Subscription createSubscription(BusRuntime runtime, MessageHandlerMetadata handlerMetadata) throws MessageBusException{
try {
SubscriptionContext context = new SubscriptionContext(bus, handlerMetadata);
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(ErrorHandlers);
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher, handlerMetadata.useStrongReferences()

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.listener.MessageHandlerMetadata;
@ -9,7 +10,9 @@ import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Todo: Add javadoc
* The subscription managers primary task is consistently handle new and existing subscriptions
* and to synchronize concurrent access to them efficiently. It takes care of properly registering and
* unregistering message listeners and is a core component of each bus implementation
*
* @author bennidi
* Date: 5/11/13
@ -41,9 +44,12 @@ public class SubscriptionManager {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory) {
private final BusRuntime runtime;
public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory, BusRuntime runtime) {
this.metadataReader = metadataReader;
this.subscriptionFactory = subscriptionFactory;
this.runtime = runtime;
}
@ -91,7 +97,7 @@ public class SubscriptionManager {
// create subscriptions for all detected message handlers
for (MessageHandlerMetadata messageHandler : messageHandlers) {
// create the subscription
subscriptionsByListener.add(subscriptionFactory.createSubscription(messageHandler));
subscriptionsByListener.add(subscriptionFactory.createSubscription(runtime, messageHandler));
}
// this will acquire a write lock and handle the case when another thread already subscribed
// this particular listener in the mean-time
@ -114,7 +120,9 @@ public class SubscriptionManager {
readWriteLock.writeLock().lock();
// basically this is a deferred double check
// it's an ugly pattern but necessary because atomic upgrade from read to write lock
// is not possible and using a write lock from the beginning with will dramatically decrease performance
// is not possible
// the alternative of using a write lock from the beginning would decrease performance dramatically
// because of the huge number of reads compared to writes
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
if (subscriptionsByListener == null) {

View File

@ -4,6 +4,7 @@ import junit.framework.Assert;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.IConcurrentSet;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
@ -26,7 +27,14 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// Shared state
protected final int numberOfElements = 100000;
protected final int numberOfThreads = 50;
protected Set gcProtector = new HashSet();
@Before
public void beforeTest(){
super.beforeTest();
gcProtector = new HashSet();
}
protected abstract IConcurrentSet createSet();
@ -36,7 +44,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final LinkedList<Object> duplicates = new LinkedList<Object>();
final HashSet<Object> distinct = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final IConcurrentSet testSet = createSet();
Random rand = new Random();
// getAll set of distinct objects and list of duplicates
@ -54,32 +62,34 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
for (Object src : duplicates) {
testSetWeak.add(src);
testSet.add(src);
}
}
}, numberOfThreads);
// check that the control set and the test set contain the exact same elements
assertEquals(distinct.size(), testSetWeak.size());
assertEquals(distinct.size(), testSet.size());
for (Object uniqueObject : distinct) {
assertTrue(testSetWeak.contains(uniqueObject));
assertTrue(testSet.contains(uniqueObject));
}
}
@Test
@Test()
public void testIterationWithConcurrentRemoval() {
final IConcurrentSet<AtomicInteger> testSetWeak = createSet();
final IConcurrentSet<AtomicInteger> testSet = createSet();
final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) {
testSetWeak.add(new AtomicInteger());
AtomicInteger element = new AtomicInteger();
testSet.add(element);
gcProtector.add(element);
}
Runnable incrementer = new Runnable() {
@Override
public void run() {
while(testSetWeak.size() > 100){
for(AtomicInteger element : testSetWeak)
while(testSet.size() > 100){
for(AtomicInteger element : testSet)
element.incrementAndGet();
}
@ -89,10 +99,10 @@ public abstract class ConcurrentSetTest extends AssertSupport {
Runnable remover = new Runnable() {
@Override
public void run() {
while(testSetWeak.size() > 100){
for(AtomicInteger element : testSetWeak)
if(rand.nextInt() % 3 == 0)
testSetWeak.remove(element);
while(testSet.size() > 100){
for(AtomicInteger element : testSet)
if(rand.nextInt() % 3 == 0 && testSet.size() > 100)
testSet.remove(element);
}
}
};
@ -100,9 +110,13 @@ public abstract class ConcurrentSetTest extends AssertSupport {
ConcurrentExecutor.runConcurrent(20, incrementer, incrementer, remover);
Set<Integer> counts = new HashSet<Integer>();
for (AtomicInteger count : testSetWeak) {
for (AtomicInteger count : testSet) {
counts.add(count.get());
}
// all atomic integers should have been visited by the the incrementer
// the same number of times
// in other words: they have either been removed at some point or incremented in each
// iteration such that all remaining atomic integers must share the same value
assertEquals(1, counts.size());
}
@ -113,7 +127,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final IConcurrentSet testSet = createSet();
// getAll set of distinct objects and mark a subset of those for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
@ -128,7 +142,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
for (Object src : source) {
testSetWeak.add(src);
testSet.add(src);
}
}
}, numberOfThreads);
@ -138,20 +152,20 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
for (Object src : toRemove) {
testSetWeak.remove(src);
testSet.remove(src);
}
}
}, numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSetWeak) {
for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar));
}
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(source.size() - toRemove.size(), testSetWeak.size());
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSetWeak.contains(src));
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
}
}
@ -160,7 +174,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final IConcurrentSet testSet = createSet();
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
@ -176,35 +190,35 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
for (Object src : source) {
testSetWeak.add(src);
testSet.add(src);
if (toRemove.contains(src))
testSetWeak.remove(src);
testSet.remove(src);
}
}
}, numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSetWeak) {
for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar));
}
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(source.size() - toRemove.size(), testSetWeak.size());
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSetWeak.contains(src));
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
}
}
@Test
public void testCompleteRemoval() {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final IConcurrentSet testSet = createSet();
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSetWeak.add(candidate);
testSet.add(candidate);
}
// getAll test set by adding the candidates
@ -213,7 +227,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
for (Object src : source) {
testSetWeak.remove(src);
testSet.remove(src);
}
}
}, numberOfThreads);
@ -221,9 +235,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(0, testSetWeak.size());
assertEquals(0, testSet.size());
for(Object src : source){
assertFalse(testSetWeak.contains(src));
assertFalse(testSet.contains(src));
}
}
@ -340,5 +354,8 @@ public abstract class ConcurrentSetTest extends AssertSupport {
return result;
}
protected void protectFromGarbageCollector(Set elements){
for(Object element : elements)
gcProtector.add(element);
}
}

View File

@ -1,12 +1,14 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.listeners.ObjectListener;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
@ -19,6 +21,11 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class DeadMessageTest extends MessageBusTest{
@Before
public void beforeTest(){
DeadMessagHandler.deadMessages.set(0);
}
@Test
public void testDeadMessage(){
@ -56,15 +63,52 @@ public class DeadMessageTest extends MessageBusTest{
assertEquals(InstancesPerListener * IterationsPerThread * ConcurrentUnits, DeadMessagHandler.deadMessages.get());
}
@Test
public void testUnsubscribingAllListeners() {
final MBassador bus = getBus(BusConfiguration.Default());
ListenerFactory deadMessageListener = new ListenerFactory()
.create(InstancesPerListener, DeadMessagHandler.class)
.create(InstancesPerListener, Object.class);
ListenerFactory objectListener = new ListenerFactory()
.create(InstancesPerListener, ObjectListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits);
// Only dead message handlers available
bus.post(new Object()).now();
// The message should be caught as dead message since there are no subscribed listeners
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get());
// Clear deadmessage for future tests
DeadMessagHandler.deadMessages.set(0);
// Add object listeners and publish again
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits);
bus.post(new Object()).now();
// verify that no dead message events were produced
assertEquals(0, DeadMessagHandler.deadMessages.get());
// Unsubscribe all object listeners
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits);
// Only dead message handlers available
bus.post(new Object()).now();
// The message should be caught, as it's the only listener
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get());
}
public static class DeadMessagHandler {
private static final AtomicInteger deadMessages = new AtomicInteger(0);
@Handler
public void handle(DeadMessage message){
deadMessages.incrementAndGet();
}
public void handle(DeadMessage message){
deadMessages.incrementAndGet();
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.FilteredMessage;
@ -31,7 +31,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(new BusConfiguration());
MBassador bus = getBus(BusConfiguration.Default());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
@ -56,7 +56,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(new BusConfiguration());
MBassador bus = getBus(BusConfiguration.Default());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listeners.*;
@ -26,7 +26,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.synchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
Runnable publishAndCheck = new Runnable() {
@ -60,7 +60,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@ -92,7 +92,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
final MBassador bus = getBus(BusConfiguration.Default(), listeners);
final MessageManager messageManager = new MessageManager();
@ -130,7 +130,7 @@ public class MBassadorTest extends MessageBusTest {
}
};
final MBassador bus = new MBassador(new BusConfiguration());
final MBassador bus = new MBassador(BusConfiguration.Default());
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -1,7 +1,7 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import org.junit.Test;

View File

@ -1,5 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.listeners.*;
@ -10,6 +11,7 @@ import net.engio.mbassy.subscription.SubscriptionManager;
import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
/**
*
@ -168,7 +170,7 @@ public class SubscriptionManagerTest extends AssertSupport {
@Test
public void testStrongListenerSubscription() throws Exception {
ListenerFactory listeners = listeners(CustomInvocationListener.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
listeners.clear();
@ -186,7 +188,7 @@ public class SubscriptionManagerTest extends AssertSupport {
Overloading.ListenerBase.class,
Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
@ -196,6 +198,12 @@ public class SubscriptionManagerTest extends AssertSupport {
runTestWith(listeners, expectedSubscriptions);
}
private BusRuntime mockedRuntime(){
return new BusRuntime(null)
.add("error.handlers", Collections.EMPTY_SET)
.add("handler.async-service", null);
}
private ListenerFactory listeners(Class ...listeners){
ListenerFactory factory = new ListenerFactory();
for(Class listener : listeners){
@ -205,7 +213,7 @@ public class SubscriptionManagerTest extends AssertSupport {
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);

View File

@ -1,6 +1,9 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.*;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.SyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
@ -183,7 +186,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new SyncMessageBus(new SyncBusConfiguration());
return new SyncMessageBus(BusConfiguration.Default());
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.MessageBusTest;

View File

@ -3,10 +3,12 @@ package net.engio.mbassy;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
/**
*
@ -16,6 +18,10 @@ import java.util.Random;
*/
public class WeakConcurrentSetTest extends ConcurrentSetTest{
@Override
protected IConcurrentSet createSet() {
return new WeakConcurrentSet();

View File

@ -17,7 +17,7 @@ public class AssertSupport {
private Runtime runtime = Runtime.getRuntime();
protected Logger logger = LoggerFactory.getLogger(getClass().getSimpleName());
private long testExecutionStart;
private volatile long testExecutionStart;
@Rule
public TestName name = new TestName();

View File

@ -3,8 +3,7 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.messages.MessageTypes;
import org.junit.Before;
@ -16,7 +15,7 @@ import org.junit.Before;
* @author bennidi
* Date: 3/2/13
*/
public abstract class MessageBusTest<Bus extends ISyncMessageBus> extends AssertSupport {
public abstract class MessageBusTest extends AssertSupport {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.common;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.PubSubSupport;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.Iterator;
@ -41,7 +41,7 @@ public class TestUtil {
};
}
public static Runnable subscriber(final ISyncMessageBus bus, final ListenerFactory listeners){
public static Runnable subscriber(final PubSubSupport bus, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
@ -54,7 +54,7 @@ public class TestUtil {
};
}
public static Runnable unsubscriber(final ISyncMessageBus bus, final ListenerFactory listeners){
public static Runnable unsubscriber(final PubSubSupport bus, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
@ -67,7 +67,7 @@ public class TestUtil {
};
}
public static void setup(final ISyncMessageBus bus, final List<Object> listeners, int numberOfThreads) {
public static void setup(final PubSubSupport bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;
if(listeners.size() >= numberOfThreads){