Further simplification of mbassador

This commit is contained in:
nathan 2015-02-03 22:32:50 +01:00
parent d17183d592
commit 8ed29f6b7f
59 changed files with 315 additions and 856 deletions

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.listener;
package net.engio.mbassy.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.listener;
package net.engio.mbassy.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.listener;
package net.engio.mbassy.annotations;
import java.lang.annotation.*;

View File

@ -1,5 +1,10 @@
package net.engio.mbassy.bus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.config.Feature;
@ -9,11 +14,6 @@ import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* The base class for all message bus implementations.
*
@ -29,70 +29,63 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
private final SubscriptionManager subscriptionManager;
private final BusRuntime runtime;
public AbstractPubSubSupport(IBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers());
// configure the pub sub feature
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider()
.createManager(pubSubFeature.getMetadataReader(),
pubSubFeature.getSubscriptionFactory(), runtime);
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers());
this.publicationFactory = pubSubFeature.getPublicationFactory();
}
protected MessagePublication.Factory getPublicationFactory() {
return publicationFactory;
return this.publicationFactory;
}
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers);
return Collections.unmodifiableCollection(this.errorHandlers);
}
@Override
public boolean unsubscribe(Object listener) {
return subscriptionManager.unsubscribe(listener);
return this.subscriptionManager.unsubscribe(listener);
}
@Override
public void subscribe(Object listener) {
subscriptionManager.subscribe(listener);
this.subscriptionManager.subscribe(listener);
}
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this){
errorHandlers.add(handler);
this.errorHandlers.add(handler);
}
}
@Override
public BusRuntime getRuntime() {
return runtime;
}
protected IMessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
Class<? extends Object> class1 = message.getClass();
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(class1);
if ((subscriptions == null || subscriptions.isEmpty()) && !class1.equals(DeadMessage.class)) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(runtime, subscriptions, new DeadMessage(message));
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
} else {
return getPublicationFactory().createPublication(runtime, subscriptions, message);
return getPublicationFactory().createPublication(this, subscriptions, message);
}
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
return subscriptionManager.getSubscriptionsByMessageType(messageType);
protected Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionManager.getSubscriptionsByMessageType(messageType);
}
public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}

View File

@ -1,11 +1,5 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -13,14 +7,19 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
*
* @param <T>
* @param <P>
*/
public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends AbstractPubSubSupport<T> implements IMessageBus<T, P> {
public abstract class AbstractSyncAsyncMessageBus<T>
extends AbstractPubSubSupport<T> implements IMessageBus<T> {
// executor for asynchronous message handlers
private final ExecutorService executor;
@ -36,15 +35,13 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
// configure asynchronous message dispatch
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
pendingMessages = asyncDispatch.getPendingMessages();
dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
this.pendingMessages = asyncDispatch.getPendingMessages();
this.dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
initDispatcherThreads(asyncDispatch);
// configure asynchronous handler invocation
Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class);
this.executor = asyncInvocation.getExecutor();
getRuntime().add(BusRuntime.Properties.AsynchronousHandlerExecutor, executor);
}
// initialize the dispatch workers
@ -53,11 +50,12 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
// each thread will run forever and process incoming
// message publication requests
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
@Override
public void run() {
while (true) {
IMessagePublication publication = null;
try {
publication = pendingMessages.take();
publication = AbstractSyncAsyncMessageBus.this.pendingMessages.take();
publication.execute();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -69,7 +67,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
}
});
dispatcher.setName("Message dispatcher");
dispatchers.add(dispatcher);
this.dispatchers.add(dispatcher);
dispatcher.start();
}
}
@ -78,7 +76,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
// this method queues a message delivery request
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) {
try {
pendingMessages.put(publication);
this.pendingMessages.put(publication);
return publication.markScheduled();
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
@ -89,7 +87,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
// this method queues a message delivery request
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(publication, timeout, unit)
return this.pendingMessages.offer(publication, timeout, unit)
? publication.markScheduled()
: publication;
} catch (InterruptedException e) {
@ -106,20 +104,22 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
@Override
public void shutdown() {
for (Thread dispatcher : dispatchers) {
for (Thread dispatcher : this.dispatchers) {
dispatcher.interrupt();
}
if(executor != null) executor.shutdown();
if(this.executor != null) {
this.executor.shutdown();
}
}
@Override
public boolean hasPendingMessages() {
return pendingMessages.size() > 0;
return this.pendingMessages.size() > 0;
}
@Override
public Executor getExecutor() {
return executor;
return this.executor;
}
}

View File

@ -1,59 +0,0 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.error.MissingPropertyException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Message bus implementations potentially vary in the features they provide and consequently in the components and properties
* they expose. The runtime is a container for all those dynamic properties and components and is meant to be passed around
* between collaborating objects such that they may access the different functionality provided by the bus implementation
* they all belong to.
*
* It is the responsibility of the bus implementation to create and configure the runtime according to its capabilities,
*
*/
public class BusRuntime {
public static class Properties{
public static final String ErrorHandlers = "error.handlers";
public static final String AsynchronousHandlerExecutor = "handler.async.executor";
}
private PubSubSupport provider;
private Map<String, Object> properties = new HashMap<String, Object>();
public BusRuntime(PubSubSupport provider) {
this.provider = provider;
}
public <T> T get(String key){
if(!contains(key))
throw new MissingPropertyException("The property " + key + " is not available in this runtime");
else return (T) properties.get(key);
}
public PubSubSupport getProvider(){
return provider;
}
public Collection<String> getKeys(){
return properties.keySet();
}
public BusRuntime add(String key, Object property){
properties.put(key, property);
return this;
}
public boolean contains(String key){
return properties.containsKey(key);
}
}

View File

@ -1,16 +1,15 @@
package net.engio.mbassy.bus;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
import java.util.concurrent.TimeUnit;
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> implements IMessageBus<T, SyncAsyncPostCommand<T>> {
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T> implements IMessageBus<T> {
public MBassador(IBusConfiguration configuration) {
super(configuration);
@ -23,22 +22,13 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
.addFeature(Feature.AsynchronousMessageDispatch.Default()));
}
public IMessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));
}
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
}
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message.
*
* @param message
*/
@Override
public void publish(T message) {
try {
IMessagePublication publication = createMessagePublication(message);
@ -49,13 +39,34 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
.setCause(e)
.setPublishedObject(message));
}
}
@Override
public SyncAsyncPostCommand<T> post(T message) {
return new SyncAsyncPostCommand<T>(this, message);
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*
* @return A message publication that can be used to access information about it's state
*/
public IMessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));
}
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
}
}

View File

@ -3,6 +3,7 @@ package net.engio.mbassy.bus;
import java.util.Collection;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.subscription.Subscription;
/**
@ -23,10 +24,10 @@ public class MessagePublication implements IMessagePublication {
// message publications can be referenced by multiple threads to query publication progress
private volatile State state = State.Initial;
private volatile boolean delivered = false;
private final BusRuntime runtime;
private PubSubSupport pubSub;
protected MessagePublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message, State initialState) {
this.runtime = runtime;
protected MessagePublication(PubSubSupport pubSub, Collection<Subscription> subscriptions, Object message, State initialState) {
this.pubSub = pubSub;
this.subscriptions = subscriptions;
this.message = message;
this.state = initialState;
@ -50,7 +51,7 @@ public class MessagePublication implements IMessagePublication {
// if the message has not been marked delivered by the dispatcher
if (!this.delivered) {
if (!isDeadEvent()) {
this.runtime.getProvider().publish(new DeadMessage(this.message));
this.pubSub.publish(new DeadMessage(this.message));
}
}
}
@ -100,8 +101,8 @@ public class MessagePublication implements IMessagePublication {
public static class Factory {
public IMessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(runtime, subscriptions, message, State.Initial);
public IMessagePublication createPublication(PubSubSupport pubSub, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(pubSub, subscriptions, message, State.Initial);
}
}

View File

@ -1,18 +1,18 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.ErrorHandlingSupport;
import net.engio.mbassy.bus.common.GenericMessagePublicationSupport;
import net.engio.mbassy.bus.common.PubSubSupport;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
* A message bus implementation that offers only synchronous message publication. Using this bus
* will not create any new threads.
*
*/
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, SyncMessageBus.SyncPostCommand>{
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements IMessageBus<T> {
public SyncMessageBus(IBusConfiguration configuration) {
@ -33,21 +33,28 @@ public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements PubSu
}
@Override
public SyncPostCommand post(T message) {
return new SyncPostCommand(message);
public IMessagePublication publishAsync(T message) {
publish(message);
return null;
}
public class SyncPostCommand implements IPublicationCommand {
@Override
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
publish(message);
return null;
}
private T message;
@Override
public Executor getExecutor() {
return null;
}
public SyncPostCommand(T message) {
this.message = message;
}
@Override
public boolean hasPendingMessages() {
return false;
}
@Override
public void now() {
publish(message);
}
@Override
public void shutdown() {
}
}

View File

@ -1,25 +0,0 @@
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
* This interface is meant to be implemented by different bus implementations to offer a consistent way
* to plugin different flavors of message publication.
*
* The parametrization of the IPostCommand influences which publication flavours are available.
*
*/
public interface GenericMessagePublicationSupport<T, P extends IPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport{
/**
* Publish a message to the bus using on of its supported message publication mechanisms. The supported
* mechanisms depend on the available implementation and are exposed as subclasses of IPublicationCommand.
* The standard mechanism is the synchronous dispatch which will publish the message in the current thread
* and returns after every matching handler has been invoked. @See IPublicationCommand.
*
* @param message
* @return
*/
P post(T message);
}

View File

@ -1,7 +1,5 @@
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
import java.util.concurrent.Executor;
/**
@ -57,14 +55,7 @@ import java.util.concurrent.Executor;
* @Author bennidi
* Date: 2/8/12
*/
public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends GenericMessagePublicationSupport<T, P>{
/**
* {@inheritDoc}
*/
@Override
P post(T message);
public interface IMessageBus<T> extends PubSubSupport<T>, ErrorHandlingSupport {
/**
* Get the executor service that is used for asynchronous message publications.
@ -89,6 +80,4 @@ public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand>
* to further use the message bus.
*/
void shutdown();
}

View File

@ -1,12 +0,0 @@
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
* @author bennidi
* Date: 3/29/13
*/
public interface ISyncMessageBus<T, P extends IPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P>{
}

View File

@ -1,12 +1,16 @@
package net.engio.mbassy.bus.common;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.IMessagePublication;
/**
* This interface defines the very basic message publication semantics according to the publish subscribe pattern.
* Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its
* handlers will be registered and start to receive matching message publications.
*
*/
public interface PubSubSupport<T> extends RuntimeProvider{
public interface PubSubSupport<T> {
/**
* Subscribe all handlers of the given listener. Any listener is only subscribed once
@ -38,4 +42,28 @@ public interface PubSubSupport<T> extends RuntimeProvider{
* @param message
*/
void publish(T message);
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*
* @return A message publication that can be used to access information about it's state
*/
IMessagePublication publishAsync(T message);
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/
IMessagePublication publishAsync(T message, long timeout, TimeUnit unit);
}

View File

@ -1,11 +0,0 @@
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.BusRuntime;
/**
* Each message bus provides a runtime object to access its dynamic features and runtime configuration.
*/
public interface RuntimeProvider {
BusRuntime getRuntime();
}

View File

@ -17,17 +17,13 @@ public class BusConfiguration implements IBusConfiguration {
@Override
public <T extends Feature> T getFeature(Class<T> feature) {
return (T)features.get(feature);
return (T)this.features.get(feature);
}
@Override
public IBusConfiguration addFeature(Feature feature) {
features.put(feature.getClass(), feature);
this.features.put(feature.getClass(), feature);
return this;
}
@Override
public IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler) {
return null; // TODO: implement configuration validation
}
}

View File

@ -1,10 +0,0 @@
package net.engio.mbassy.bus.config;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 8/29/14
*/
public interface ConfigurationErrorHandler {
}

View File

@ -1,14 +1,17 @@
package net.engio.mbassy.bus.config;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A feature defines the configuration of a specific functionality of a message bus.
@ -19,38 +22,17 @@ import java.util.concurrent.atomic.AtomicInteger;
public interface Feature {
class SyncPubSub implements Feature{
class SyncPubSub implements Feature {
public static final SyncPubSub Default(){
return new SyncPubSub()
.setMetadataReader(new MetadataReader())
.setPublicationFactory(new MessagePublication.Factory())
.setSubscriptionFactory(new SubscriptionFactory())
.setSubscriptionManagerProvider(new SubscriptionManagerProvider());
.setPublicationFactory(new MessagePublication.Factory());
}
private MessagePublication.Factory publicationFactory;
private MetadataReader metadataReader;
private SubscriptionFactory subscriptionFactory;
private ISubscriptionManagerProvider subscriptionManagerProvider;
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public SyncPubSub setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public SyncPubSub setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
public MetadataReader getMetadataReader() {
return metadataReader;
@ -76,7 +58,7 @@ public interface Feature {
}
}
class AsynchronousHandlerInvocation implements Feature{
class AsynchronousHandlerInvocation implements Feature {
protected static final ThreadFactory MessageHandlerThreadFactory = new ThreadFactory() {
@ -114,7 +96,7 @@ public interface Feature {
}
}
class AsynchronousMessageDispatch implements Feature{
class AsynchronousMessageDispatch implements Feature {
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {

View File

@ -22,9 +22,4 @@ public interface IBusConfiguration{
<T extends Feature> T getFeature(Class<T> feature);
IBusConfiguration addFeature(Feature feature);
IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler);
}

View File

@ -6,7 +6,7 @@ package net.engio.mbassy.bus.error;
* i.e. one component is trying to access a feature of another component that does not
* provide the feature (e.g. asynchronous functionality within a synchronous bus)
*/
public class MissingPropertyException extends RuntimeException{
public class MissingPropertyException extends RuntimeException {
public MissingPropertyException(String message) {
super(message);

View File

@ -1,15 +0,0 @@
package net.engio.mbassy.bus.publication;
/**
* A publication command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the message publication flavors provided by the message bus implementation that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*/
public interface IPublicationCommand {
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
void now();
}

View File

@ -1,35 +0,0 @@
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.IMessagePublication;
import java.util.concurrent.TimeUnit;
/**
*
*
*/
public interface ISyncAsyncPublicationCommand extends IPublicationCommand {
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
*
* @return A message publication that can be used to access information about the state of
*/
IMessagePublication asynchronously();
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/
IMessagePublication asynchronously(long timeout, TimeUnit unit);
}

View File

@ -1,38 +0,0 @@
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.IMessagePublication;
import java.util.concurrent.TimeUnit;
/**
* This post command provides access to standard synchronous and asynchronous dispatch
*
* @author bennidi
* Date: 11/12/12
*/
public class SyncAsyncPostCommand<T> implements ISyncAsyncPublicationCommand {
private T message;
private MBassador<T> mBassador;
public SyncAsyncPostCommand(MBassador<T> mBassador, T message) {
this.mBassador = mBassador;
this.message = message;
}
@Override
public void now() {
mBassador.publish(message);
}
@Override
public IMessagePublication asynchronously() {
return mBassador.publishAsync(message);
}
@Override
public IMessagePublication asynchronously(long timeout, TimeUnit unit) {
return mBassador.publishAsync(message, timeout, unit);
}
}

View File

@ -8,7 +8,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
/**
* @author bennidi

View File

@ -1,30 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
* A delegating dispatcher wraps additional logic around a given delegate. Essentially its
* an implementation of the decorator pattern.
*
* @author bennidi
* Date: 3/1/13
*/
public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
private final IMessageDispatcher delegate;
public DelegatingMessageDispatcher(IMessageDispatcher delegate) {
super(delegate.getContext());
this.delegate = delegate;
}
protected IMessageDispatcher getDelegate() {
return delegate;
}
@Override
public IHandlerInvocation getInvocation() {
return delegate.getInvocation();
}
}

View File

@ -1,30 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.util.Collection;
/**
* This is the base class for handler invocations that already implements all context related methods only leaving the implementation of the actual invocation mechanism to the concrete subclass.
*
* @author bennidi
* Date: 3/29/13
*/
public abstract class HandlerInvocation<HANDLER, MESSAGE> extends AbstractSubscriptionContextAware implements IHandlerInvocation<HANDLER, MESSAGE>{
private final Collection<IPublicationErrorHandler> errorHandlers;
public HandlerInvocation(SubscriptionContext context) {
super(context);
errorHandlers = context.getErrorHandlers();
}
protected void handlePublicationError(PublicationError error){
for(IPublicationErrorHandler handler : errorHandlers)
handler.handleError(error);
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.subscription.ISubscriptionContextAware;
import java.lang.reflect.Method;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
@ -15,15 +15,16 @@ import net.engio.mbassy.subscription.ISubscriptionContextAware;
* @author bennidi
* Date: 11/23/12
*/
public interface IHandlerInvocation<HANDLER, MESSAGE> extends ISubscriptionContextAware {
public interface IHandlerInvocation {
/**
* Invoke the message delivery logic of this handler
*
* @param handler The listener that will receive the message. This can be a reference to a method object
* @param listener The listener that will receive the message. This can be a reference to a method object
* from the java reflection api or any other wrapper that can be used to invoke the handler
* @param message The message to be delivered to the handler. This can be any object compatible with the object
* type that the handler consumes
* @param handler The handler (method) that will be called via reflection
*/
void invoke(HANDLER handler, MESSAGE message);
void invoke(Object listener, Object message, Method handler) throws Throwable;
}

View File

@ -1,42 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.ISubscriptionContextAware;
/**
* A message dispatcher provides the functionality to deliver a single message
* to a set of listeners. A message dispatcher uses a message context to access
* all information necessary for the message delivery.
* <p/>
* The delivery of a single message to a single listener is responsibility of the
* handler invocation object associated with the dispatcher.
* <p/>
* Implementations if IMessageDispatcher are partially designed using decorator pattern
* such that it is possible to compose different message dispatchers into dispatcher chains
* to achieve more complex dispatch logic.
*
* @author bennidi
* Date: 11/23/12
*/
public interface IMessageDispatcher extends ISubscriptionContextAware {
/**
* Delivers the given message to the given set of listeners.
* Delivery may be delayed, aborted or restricted in various ways, depending
* on the configuration of the dispatcher
*
* @param publication The message publication that initiated the dispatch
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
void dispatch(IMessagePublication publication, Object message, Iterable listeners);
/**
* Get the handler invocation that will be used to deliver the
* message to each listener.
*
* @return the handler invocation that will be used to deliver the
* message to each listener
*/
IHandlerInvocation getInvocation();
}

View File

@ -1,38 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* Standard implementation for direct, unfiltered message delivery.
* <p/>
* For each message delivery, this dispatcher iterates over the listeners
* and uses the previously provided handler invocation to deliver the message
* to each listener
*
* @author bennidi
* Date: 11/23/12
*/
public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
private final IHandlerInvocation invocation;
public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
super(context);
this.invocation = invocation;
}
@Override
public void dispatch(final IMessagePublication publication, final Object message, final Iterable listeners){
publication.markDelivered();
for (Object listener : listeners) {
getInvocation().invoke(listener, message);
}
}
@Override
public IHandlerInvocation getInvocation() {
return invocation;
}
}

View File

@ -1,9 +1,5 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
@ -12,40 +8,14 @@ import java.lang.reflect.Method;
* @author bennidi
* Date: 11/23/12
*/
public class ReflectiveHandlerInvocation extends HandlerInvocation{
public class ReflectiveHandlerInvocation implements IHandlerInvocation {
public ReflectiveHandlerInvocation(SubscriptionContext context) {
super(context);
public ReflectiveHandlerInvocation() {
super();
}
protected void invokeHandler(final Object message, final Object listener, Method handler){
try {
handler.invoke(listener, message);
} catch (IllegalAccessException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"The class or method is not accessible",
handler, listener, message));
} catch (IllegalArgumentException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
} catch (InvocationTargetException e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"Message handler threw exception",
handler, listener, message));
} catch (Throwable e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"The handler code threw an exception",
handler, listener, message));
}
}
/**
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message){
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());
public void invoke(final Object listener, final Object message, Method handler) throws Throwable {
handler.invoke(listener, message);
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import java.lang.reflect.Method;
/**
* Synchronizes message handler invocations for all handlers that specify @Synchronized
@ -8,12 +8,11 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
* @author bennidi
* Date: 3/31/13
*/
public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation<Object,Object> {
public class SynchronizedHandlerInvocation implements IHandlerInvocation {
private IHandlerInvocation delegate;
public SynchronizedHandlerInvocation(IHandlerInvocation delegate) {
super(delegate.getContext());
this.delegate = delegate;
}
@ -21,9 +20,9 @@ public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAw
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message){
public void invoke(final Object listener, final Object message, Method handler) throws Throwable {
synchronized (listener){
delegate.invoke(listener, message);
this.delegate.invoke(listener, message, handler);
}
}

View File

@ -3,6 +3,8 @@ package net.engio.mbassy.listener;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Synchronized;
import net.engio.mbassy.common.ReflectionUtils;
/**

View File

@ -4,6 +4,7 @@ import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.common.ReflectionUtils;
/**

View File

@ -1,21 +0,0 @@
package net.engio.mbassy.subscription;
/**
* The base implementation for subscription context aware objects (mightily obvious :)
*
* @author bennidi
* Date: 3/1/13
*/
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware {
private final SubscriptionContext context;
public AbstractSubscriptionContextAware(SubscriptionContext context) {
this.context = context;
}
public SubscriptionContext getContext() {
return context;
}
}

View File

@ -1,17 +0,0 @@
package net.engio.mbassy.subscription;
/**
* This interface marks components that have access to the subscription context.
*
* @author bennidi
* Date: 3/1/13
*/
public interface ISubscriptionContextAware{
/**
* Get the subscription context associated with this object
*
* @return the subscription context associated with this object
*/
SubscriptionContext getContext();
}

View File

@ -1,9 +0,0 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public interface ISubscriptionManagerProvider {
SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime);
}

View File

@ -1,12 +1,19 @@
package net.engio.mbassy.subscription;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.listener.MessageHandler;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (exlcuding subclasses) of a SingleMessageHandler.class
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
* will be referenced in the subscription created for SingleMessageHandler.class.
*
* There will be as many unique subscription objects per message listener class as there are message handlers
@ -18,15 +25,22 @@ import net.engio.mbassy.dispatch.IMessageDispatcher;
*/
public class Subscription {
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
private final IHandlerInvocation invocation;
protected final IConcurrentSet<Object> listeners;
private final IMessageDispatcher dispatcher;
Subscription(MessageHandler handler, Collection<IPublicationErrorHandler> errorHandlers,
IHandlerInvocation invocation, IConcurrentSet<Object> listeners) {
private final SubscriptionContext context;
Subscription(SubscriptionContext context, IMessageDispatcher dispatcher, IConcurrentSet<Object> listeners) {
this.context = context;
this.dispatcher = dispatcher;
this.handlerMetadata = handler;
this.errorHandlers = errorHandlers;
this.invocation = invocation;
this.listeners = listeners;
}
@ -37,7 +51,7 @@ public class Subscription {
* @return
*/
public boolean belongsTo(Class<?> listener){
return this.context.getHandlerMetadata().isFromListener(listener);
return this.handlerMetadata.isFromListener(listener);
}
/**
@ -55,17 +69,57 @@ public class Subscription {
* @return
*/
public boolean handlesMessageType(Class<?> messageType) {
return this.context.getHandlerMetadata().handlesMessage(messageType);
return this.handlerMetadata.handlesMessage(messageType);
}
public Class<?>[] getHandledMessageTypes(){
return this.context.getHandlerMetadata().getHandledMessages();
return this.handlerMetadata.getHandledMessages();
}
public void publish(IMessagePublication publication, Object message){
if (this.listeners.size() > 0) {
publication.markDelivered();
/**
* Delivers the given message to the given set of listeners.
* Delivery may be delayed, aborted or restricted in various ways, depending
* on the configuration of the dispatcher
*
* @param publication The message publication that initiated the dispatch
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
Method handler = this.handlerMetadata.getHandler();
for (Object listener : this.listeners) {
try {
this.invocation.invoke(listener, message, handler);
} catch (IllegalAccessException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"The class or method is not accessible",
handler, listener, message));
} catch (IllegalArgumentException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
} catch (InvocationTargetException e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"Message handler threw exception",
handler, listener, message));
} catch (Throwable e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"The handler code threw an exception",
handler, listener, message));
}
}
}
}
public void publish(IMessagePublication publication, Object message){
if(this.listeners.size() > 0) {
this.dispatcher.dispatch(publication, message, this.listeners);
private final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler handler : this.errorHandlers) {
handler.handleError(error);
}
}

View File

@ -1,59 +0,0 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.common.RuntimeProvider;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.listener.MessageHandler;
import java.util.Collection;
/**
* The subscription context holds all (meta)data/objects that are relevant to successfully publish
* a message within a subscription. A one-to-one relation between a subscription and
* subscription context holds -> a subscription context is created for each distinct subscription
* managed by the subscription manager.
*
* @author bennidi
* Date: 11/23/12
*/
public class SubscriptionContext implements RuntimeProvider {
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
private BusRuntime runtime;
public SubscriptionContext(BusRuntime runtime, MessageHandler handlerMetadata,
Collection<IPublicationErrorHandler> errorHandlers) {
this.runtime = runtime;
this.handlerMetadata = handlerMetadata;
this.errorHandlers = errorHandlers;
}
/**
* Get the meta data that specifies the characteristics of the message handler
* that is associated with this context
*
* @return
*/
public MessageHandler getHandlerMetadata() {
return handlerMetadata;
}
/**
* Get the error handlers registered with the enclosing bus.
* @return
*/
public Collection<IPublicationErrorHandler> getErrorHandlers(){
return errorHandlers;
}
@Override
public BusRuntime getRuntime() {
return runtime;
}
}

View File

@ -1,68 +0,0 @@
package net.engio.mbassy.subscription;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.Collection;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.MessageDispatcher;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.listener.MessageHandler;
/**
* The subscription factory is used to create an empty subscription for specific message handler.
* The message handler's configuration is evaluated and a corresponding subscription is built.
*/
public class SubscriptionFactory {
public Subscription createSubscription(BusRuntime runtime, MessageHandler handlerMetadata) throws MessageBusException{
try {
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(BusRuntime.Properties.ErrorHandlers);
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher, new WeakConcurrentSet<Object>());
} catch (Exception e) {
throw new MessageBusException(e);
}
}
protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context) throws Exception {
IHandlerInvocation invocation = createBaseHandlerInvocation(context);
if(context.getHandlerMetadata().isSynchronized()){
invocation = new SynchronizedHandlerInvocation(invocation);
}
return invocation;
}
protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
return dispatcher;
}
protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws MessageBusException {
Class<? extends HandlerInvocation> invocation = ReflectiveHandlerInvocation.class;
if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){
throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class");
}
try {
Constructor<? extends IHandlerInvocation> constructor = invocation.getConstructor(SubscriptionContext.class);
return constructor.newInstance(context);
} catch (NoSuchMethodException e) {
throw new MessageBusException("The provided handler invocation did not specify the necessary constructor "
+ invocation.getSimpleName() + "(SubscriptionContext);", e);
} catch (Exception e) {
throw new MessageBusException("Could not instantiate the provided handler invocation "
+ invocation.getSimpleName(), e);
}
}
}

View File

@ -9,8 +9,13 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.listener.MessageHandler;
import net.engio.mbassy.listener.MetadataReader;
@ -25,8 +30,6 @@ import net.engio.mbassy.listener.MetadataReader;
*/
public class SubscriptionManager {
private static final Object setObject = new Object();
// the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader;
@ -46,19 +49,15 @@ public class SubscriptionManager {
// remember already processed classes that do not contain any message handlers
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
// synchronize read/write acces to the subscription maps
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final BusRuntime runtime;
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory, BusRuntime runtime) {
public SubscriptionManager(MetadataReader metadataReader, Collection<IPublicationErrorHandler> errorHandlers) {
this.metadataReader = metadataReader;
this.subscriptionFactory = subscriptionFactory;
this.runtime = runtime;
this.errorHandlers = errorHandlers;
}
@ -110,7 +109,19 @@ public class SubscriptionManager {
// create subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
subscriptionsByListener.add(this.subscriptionFactory.createSubscription(this.runtime, messageHandler));
try {
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (messageHandler.isSynchronized()){
invocation = new SynchronizedHandlerInvocation(invocation);
}
Subscription subscription = new Subscription(messageHandler, this.errorHandlers, invocation, new WeakConcurrentSet<Object>());
subscriptionsByListener.add(subscription);
} catch (Exception e) {
throw new MessageBusException(e);
}
}
// this will acquire a write lock and handle the case when another thread already subscribed

View File

@ -1,12 +0,0 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public class SubscriptionManagerProvider implements ISubscriptionManagerProvider {
@Override
public SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime) {
return new SubscriptionManager(reader, factory, runtime);
}
}

View File

@ -3,10 +3,10 @@ package net.engio.mbassy;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import org.junit.Test;
@ -36,7 +36,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
}
// publish in ascending order
for(Integer message : messages) {
fifoBUs.post(message).asynchronously();
fifoBUs.publishAsync(message);
}
while(fifoBUs.hasPendingMessages()) {
@ -72,7 +72,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
}
// publish in ascending order
for(Integer message : messages) {
fifoBUs.post(message).asynchronously();
fifoBUs.publishAsync(message);
}
while(fifoBUs.hasPendingMessages()) {
@ -88,50 +88,6 @@ public class AsyncFIFOBusTest extends MessageBusTest {
}
/*
@Test
public void testMultiThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
final IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
List<SyncListener> listeners = new LinkedList<SyncListener>();
for(int i = 0; i < 1000 ; i++){
SyncListener listener = new SyncListener();
listeners.add(listener);
fifoBUs.subscribe(listener);
}
// prepare set of messages in increasing order
final int[] messages = new int[10000];
for(int i = 0; i < messages.length ; i++){
messages[i] = i;
}
final AtomicInteger messageIndex = new AtomicInteger(0);
// publish in ascending order
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
int idx;
while((idx = messageIndex.getAndIncrement()) < messages.length){
fifoBUs.post(messages[idx]).asynchronously();
}
}
}, 5);
while(fifoBUs.hasPendingMessages())
pause(1000);
for(SyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
for(int i=0; i < messages.length; i++){
assertEquals(messages[i], listener.receivedSync.get(i));
}
}
} */
public static class SyncListener {
private List<Integer> receivedSync = new LinkedList<Integer>();

View File

@ -8,10 +8,10 @@ import java.lang.annotation.Target;
import java.util.HashSet;
import java.util.Set;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Synchronized;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Synchronized;
import org.junit.Test;

View File

@ -2,13 +2,13 @@ package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.listeners.ObjectListener;
@ -78,7 +78,7 @@ public class DeadMessageTest extends MessageBusTest{
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits);
// Only dead message handlers available
bus.post(new Object()).now();
bus.publish(new Object());
// The message should be caught as dead message since there are no subscribed listeners
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get());
@ -88,7 +88,7 @@ public class DeadMessageTest extends MessageBusTest{
// Add object listeners and publish again
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits);
bus.post(new Object()).now();
bus.publish(new Object());
// verify that no dead message events were produced
assertEquals(0, DeadMessagHandler.deadMessages.get());
@ -97,7 +97,7 @@ public class DeadMessageTest extends MessageBusTest{
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits);
// Only dead message handlers available
bus.post(new Object()).now();
bus.publish(new Object());
// The message should be caught, as it's the only listener
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get());

View File

@ -44,9 +44,9 @@ public class MBassadorTest extends MessageBusTest {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
bus.publish(standardMessage);
bus.publish(multipartMessage);
bus.publish(MessageTypes.Simple);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
@ -78,7 +78,7 @@ public class MBassadorTest extends MessageBusTest {
@Override
public void run() {
bus.post(MessageTypes.Simple).asynchronously();
bus.publishAsync(MessageTypes.Simple);
}
};
@ -112,7 +112,7 @@ public class MBassadorTest extends MessageBusTest {
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
bus.post(new StandardMessage()).asynchronously();
bus.publishAsync(new StandardMessage());
}
};

View File

@ -4,8 +4,8 @@ import java.io.BufferedReader;
import java.util.HashMap;
import java.util.Map;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MessageListener;
import net.engio.mbassy.listener.MetadataReader;

View File

@ -1,9 +1,9 @@
package net.engio.mbassy;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import org.junit.Test;
/**
@ -24,7 +24,7 @@ public class MethodDispatchTest extends MessageBusTest{
@Handler
public void handleString(String s) {
listener1Called = true;
MethodDispatchTest.this.listener1Called = true;
}
}
@ -33,8 +33,9 @@ public class MethodDispatchTest extends MessageBusTest{
public class EventListener2 extends EventListener1 {
// redefine handler implementation (not configuration)
@Override
public void handleString(String s) {
listener2Called = true;
MethodDispatchTest.this.listener2Called = true;
}
}
@ -44,14 +45,14 @@ public class MethodDispatchTest extends MessageBusTest{
IMessageBus bus = createBus(SyncAsync());
EventListener2 listener2 = new EventListener2();
bus.subscribe(listener2);
bus.post("jfndf").now();
assertTrue(listener2Called);
assertFalse(listener1Called);
bus.publish("jfndf");
assertTrue(this.listener2Called);
assertFalse(this.listener1Called);
EventListener1 listener1 = new EventListener1();
bus.subscribe(listener1);
bus.post("jfndf").now();
assertTrue(listener1Called);
bus.publish("jfndf");
assertTrue(this.listener1Called);
}
}

View File

@ -2,7 +2,7 @@ package net.engio.mbassy;
import java.util.Collections;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
@ -24,7 +24,6 @@ import net.engio.mbassy.messages.IMultipartMessage;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import org.junit.Test;
@ -167,7 +166,7 @@ public class SubscriptionManagerTest extends AssertSupport {
Overloading.ListenerBase.class,
Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.<IPublicationErrorHandler>emptyList());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
@ -177,22 +176,16 @@ public class SubscriptionManagerTest extends AssertSupport {
runTestWith(listeners, expectedSubscriptions);
}
private BusRuntime mockedRuntime(){
return new BusRuntime(null)
.add(BusRuntime.Properties.ErrorHandlers, Collections.EMPTY_SET)
.add(BusRuntime.Properties.AsynchronousHandlerExecutor, null);
}
private ListenerFactory listeners(Class ...listeners){
private ListenerFactory listeners(Class<?> ...listeners){
ListenerFactory factory = new ListenerFactory();
for(Class listener : listeners){
for (Class<?> listener : listeners){
factory.create(InstancesPerListener, listener);
}
return factory;
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.<IPublicationErrorHandler>emptyList());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);

View File

@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.GenericMessagePublicationSupport;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;
@ -30,12 +30,12 @@ import org.junit.Test;
public abstract class SyncBusTest extends MessageBusTest {
protected abstract GenericMessagePublicationSupport getSyncMessageBus();
protected abstract IMessageBus getSyncMessageBus();
@Test
public void testSynchronousMessagePublication() throws Exception {
final GenericMessagePublicationSupport bus = getSyncMessageBus();
final IMessageBus bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -52,10 +52,10 @@ public abstract class SyncBusTest extends MessageBusTest {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
bus.post(MessageTypes.Multipart).now();
bus.publish(standardMessage);
bus.publish(multipartMessage);
bus.publish(MessageTypes.Simple);
bus.publish(MessageTypes.Multipart);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
@ -86,7 +86,7 @@ public abstract class SyncBusTest extends MessageBusTest {
}
};
final GenericMessagePublicationSupport bus = getSyncMessageBus();
final IMessageBus bus = getSyncMessageBus();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
@ -96,7 +96,7 @@ public abstract class SyncBusTest extends MessageBusTest {
Runnable publish = new Runnable() {
@Override
public void run() {
bus.post(new StandardMessage()).now();
bus.publish(new StandardMessage());
}
};
@ -115,7 +115,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected GenericMessagePublicationSupport getSyncMessageBus() {
protected IMessageBus getSyncMessageBus() {
return new MBassador();
}
@ -125,7 +125,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected GenericMessagePublicationSupport getSyncMessageBus() {
protected IMessageBus getSyncMessageBus() {
return BusFactory.SynchronousOnly();
}
}

View File

@ -3,13 +3,13 @@ package net.engio.mbassy;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Synchronized;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Synchronized;
import org.junit.Test;
@ -41,7 +41,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
IMessagePublication publication = null;
for(int i = 0; i < numberOfMessages; i++){
publication = bus.post(new Object()).asynchronously();
publication = bus.publishAsync(new Object());
}
// wait for last publication
while (!publication.isFinished()){

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.AbstractMessage;
/**

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Listener;
import net.engio.mbassy.messages.StandardMessage;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.ICountable;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.IMessage;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.IMultipartMessage;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.MessageTypes;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.MultipartMessage;
/**

View File

@ -4,7 +4,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
public class ObjectListener {

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Listener;
import net.engio.mbassy.messages.AbstractMessage;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.messages.StandardMessage;
/**