release 1.1.10

This commit is contained in:
Benjamin Diedrichsen 2014-04-02 09:45:04 +02:00
parent 54f7bd72b9
commit 4034024e30
44 changed files with 420 additions and 136 deletions

View File

@ -157,6 +157,16 @@ to avoid confusion and misunderstanding.
<h2>Release Notes</h2>
<h3>1.1.10</h3>
+ Fixed broken sort order of prioritized handlers (see #58)
+ Addressed issue #63 by making the constructor of `MessageHandler` use a map of properties and by replacing dependencies to
all MBassador specific annotations with Java primitives and simple interfaces
+ Small refactorings (moved stuff around to have cleaner packaging)
+ MessageBus.getExecutor() is now deprecated and will be removed with next release -> use the runtime to get access to it.
+ Introduced BusFactory with convenience methods for creating bus instances for different message dispatching scenarios like
asynchronous FIFO (asynchronous message publications guaranteed to be delivered in the order they occurred)
+ Renamed runtime property of `BusRuntime` "handler.async-service" to "handler.async.executor"
<h3>1.1.9</h3>
+ Fixed memory leak reported in issue #53

View File

@ -18,5 +18,6 @@ Add code examples Javadoc of main classes
Describe 1-Thread FIFO scheme with async dispatch
Explain how MBassador can be extended easily using delegation
Refer to Spring integration component
Creating bus hierarchies
How to make sender part of the message publication
How to add global filtering by means of delegation

View File

@ -1,9 +1,10 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
@ -17,7 +18,7 @@ import java.util.List;
*
* @param <T>
*/
public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>{
public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
// this handler will receive all errors that occur during message dispatch or message handling
@ -32,7 +33,7 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>{
public AbstractPubSubSupport(ISyncBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers());
this.subscriptionManager = configuration.getSubscriptionManagerProvider()
.createManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory(), runtime);

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
import java.util.ArrayList;
@ -12,7 +13,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The base class for all async message bus implementations.
* The base class for all message bus implementations with support for asynchronous message dispatch
*
* @param <T>
* @param <P>
@ -32,7 +33,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
super(configuration);
this.executor = configuration.getExecutorForAsynchronousHandlers();
getRuntime().add("handler.async-service", executor);
getRuntime().add(BusRuntime.Properties.AsynchronousHandlerExecutor, executor);
pendingMessages = configuration.getPendingMessagesQueue();
dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
initDispatcherThreads(configuration);
@ -61,6 +62,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
}
}
});
dispatcher.setName("Message dispatcher");
dispatchers.add(dispatcher);
dispatcher.start();
}
@ -92,8 +94,8 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
@Override
protected void finalize() throws Throwable {
shutdown();
super.finalize();
shutdown();
}
@Override

View File

@ -0,0 +1,38 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.SyncBusConfiguration;
/**
* The bus factory provides convenient factory methods for the most common bus use cases.
*
* @author bennidi
* Date: 3/30/14
*/
public class BusFactory {
/**
* Create a message bus supporting only synchronous message publication.
* All message publications will run in the calling thread, no bus internal
* multi-threading will occur.
*
* @return
*/
public static ISyncMessageBus SynchronousOnly(){
return new SyncMessageBus(new SyncBusConfiguration());
}
/**
* Create a message bus supporting synchronous and asynchronous message publication.
* Asynchronous message publication will be handled by a single thread such that FIFO
* order of message processing is guaranteed.
*
*
* @return
*/
public static IMessageBus AsynchronousSequentialFIFO(){
return new MBassador(BusConfiguration.Default(1,1,1));
}
}

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.common.MissingPropertyException;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.error.MissingPropertyException;
import java.util.Collection;
import java.util.HashMap;
@ -17,6 +18,13 @@ import java.util.Map;
*/
public class BusRuntime {
public static class Properties{
public static final String ErrorHandlers = "error.handlers";
public static final String AsynchronousHandlerExecutor = "handler.async.executor";
}
private PubSubSupport provider;
private Map<String, Object> properties = new HashMap<String, Object>();

View File

@ -1,19 +0,0 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
import java.util.concurrent.TimeUnit;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 8/21/13
* Time: 11:05 AM
* To change this template use File | Settings | File Templates.
*/
public interface IMBassador<T> extends IMessageBus<T, SyncAsyncPostCommand<T>> {
MessagePublication publishAsync(T message);
MessagePublication publishAsync(T message, long timeout, TimeUnit unit);
}

View File

@ -1,25 +1,24 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
import java.util.concurrent.TimeUnit;
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> implements IMBassador<T> {
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> implements IMessageBus<T, SyncAsyncPostCommand<T>> {
public MBassador(BusConfiguration configuration) {
super(configuration);
}
@Override
public MessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));
}
@Override
public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.FilteredMessage;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -79,11 +79,11 @@ public class MessagePublication {
public boolean isDeadEvent() {
return DeadMessage.class.isAssignableFrom(message.getClass());
return DeadMessage.class.equals(message.getClass());
}
public boolean isFilteredEvent() {
return FilteredMessage.class.isAssignableFrom(message.getClass());
return FilteredMessage.class.equals(message.getClass());
}
public Object getMessage() {

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
@ -9,7 +10,7 @@ import net.engio.mbassy.bus.publication.IPublicationCommand;
* will not create any new threads.
*
*/
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements ISyncMessageBus<T, SyncMessageBus.SyncPostCommand>{
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements ISyncMessageBus<T, SyncMessageBus.SyncPostCommand> {
public SyncMessageBus(ISyncBusConfiguration configuration) {

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.common;
package net.engio.mbassy.bus.common;
/**
* The DeadEvent is delivered to all subscribed handlers (if any) whenever no message
* The dead message event is published whenever no message
* handlers could be found for a given message publication.
*
* @author bennidi

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import java.util.Collection;

View File

@ -1,14 +1,14 @@
package net.engio.mbassy.common;
package net.engio.mbassy.bus.common;
/**
* A filtered event is published when there have been matching subscriptions for a given
* A filtered message event is published when there have been matching subscriptions for a given
* message publication but configured filters prevented the message from being delivered to
* any of the handlers.
*
* @author bennidi
* Date: 3/1/13
*/
public class FilteredMessage extends PublicationEvent {
public final class FilteredMessage extends PublicationEvent {
public FilteredMessage(Object event) {

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.IPublicationCommand;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;
@ -70,14 +70,16 @@ public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand>
* Get the executor service that is used for asynchronous message publications.
* The executor is passed to the message bus at creation time.
*
* Note: The executor can be obtained from the run time. See
* @return
*/
@Deprecated
Executor getExecutor();
/**
* Check whether any asynchronous message publications are pending to be processed
*
* @return
* @return true if any unfinished message publications are found
*/
boolean hasPendingMessages();

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.publication.IPublicationCommand;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
/**
* This interface defines the very basic message publication semantics according to the publish subscribe pattern.

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.common;
package net.engio.mbassy.bus.common;
/**
* A wrapped event is created when various conditions are matched (these depend on the concrete
@ -9,13 +9,13 @@ package net.engio.mbassy.common;
*/
public abstract class PublicationEvent {
private Object event;
private Object relatedMessage;
public PublicationEvent(Object message) {
this.event = message;
this.relatedMessage = message;
}
public Object getMessage() {
return event;
return relatedMessage;
}
}

View File

@ -1,4 +1,6 @@
package net.engio.mbassy.bus;
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.BusRuntime;
/**
* Each message bus provides a runtime object to access its dynamic features and runtime configuration.

View File

@ -175,8 +175,9 @@ public class BusConfiguration implements IBusConfiguration {
return metadataReader;
}
public void setMetadataReader(MetadataReader metadataReader) {
public BusConfiguration setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return this;
}
@Override
@ -193,7 +194,8 @@ public class BusConfiguration implements IBusConfiguration {
return subscriptionManagerProvider;
}
public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
public BusConfiguration setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return this;
}
}

View File

@ -5,8 +5,16 @@ import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
/**
* The configuration options for the synchronous message bus {@link net.engio.mbassy.bus.SyncMessageBus}
*/
public interface ISyncBusConfiguration {
/**
* The message publication factory is used to wrap a published message
* and while it is being processed
* @return The factory to be used by the bus to create the publications
*/
MessagePublication.Factory getMessagePublicationFactory();
MetadataReader getMetadataReader();

View File

@ -1,4 +1,4 @@
package net.engio.mbassy;
package net.engio.mbassy.bus.error;
/**
* Publication error handlers are provided with a publication error every time an

View File

@ -1,4 +1,4 @@
package net.engio.mbassy;
package net.engio.mbassy.bus.error;
/**
* Todo: Add javadoc

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.common;
package net.engio.mbassy.bus.error;
/**
* This exception is thrown when a property value that is unavailable at runtime is accessed.

View File

@ -1,4 +1,4 @@
package net.engio.mbassy;
package net.engio.mbassy.bus.error;
import net.engio.mbassy.bus.MessagePublication;

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import java.util.concurrent.ExecutorService;
@ -19,7 +20,7 @@ public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAw
public AsynchronousHandlerInvocation(IHandlerInvocation delegate) {
super(delegate.getContext());
this.delegate = delegate;
this.executor = delegate.getContext().getRuntime().get("handler.async-service");
this.executor = delegate.getContext().getRuntime().get(BusRuntime.Properties.AsynchronousHandlerExecutor);
}
/**

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.lang.reflect.InvocationTargetException;

View File

@ -3,30 +3,80 @@ package net.engio.mbassy.listener;
import net.engio.mbassy.dispatch.HandlerInvocation;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler defines the message listener and more generally, any class containing a message handler in its class hierarchy
* defines a message listener.
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
* defines such a message listener.
*
* @author bennidi
* Date: 11/14/12
*/
public class MessageHandler {
public static final class Properties{
public static final String HandlerMethod = "handler";
public static final String InvocationMode = "invocationMode";
public static final String Filter = "filter";
public static final String Enveloped = "envelope";
public static final String HandledMessages = "messages";
public static final String IsSynchronized = "synchronized";
public static final String Listener = "listener";
public static final String AcceptSubtypes = "subtypes";
public static final String Priority = "priority";
public static final String Invocation = "invocation";
/**
* Create the property map for the {@link MessageHandler} constructor using the default objects.
*
* @param handler The handler annotated method of the listener
* @param handlerConfig The annotation that configures the handler
* @param filter The set of preconfigured filters if any
* @param listenerConfig The listener metadata
* @return A map of properties initialized from the given parameters that will conform to the requirements of the
* {@link MessageHandler} constructor. See {@see MessageHandler.validate()} for more details.
*/
public static final Map<String, Object> Create(Method handler, Handler handlerConfig, IMessageFilter[] filter, MessageListener listenerConfig){
if(handler == null){
throw new IllegalArgumentException("The message handler configuration may not be null");
}
net.engio.mbassy.listener.Enveloped enveloped = handler.getAnnotation(Enveloped.class);
Class[] handledMessages = enveloped != null
? enveloped.messages()
: handler.getParameterTypes();
handler.setAccessible(true);
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(HandlerMethod, handler);
properties.put(Filter, filter != null ? filter : new IMessageFilter[]{});
properties.put(Priority, handlerConfig.priority());
properties.put(Invocation, handlerConfig.invocation());
properties.put(InvocationMode, handlerConfig.delivery());
properties.put(Enveloped, enveloped != null);
properties.put(AcceptSubtypes, !handlerConfig.rejectSubtypes());
properties.put(Listener, listenerConfig);
properties.put(IsSynchronized, handler.getAnnotation(Synchronized.class) != null);
properties.put(HandledMessages, handledMessages);
return properties;
}
}
private final Method handler;
private final IMessageFilter[] filter;
private final Handler handlerConfig;
private final int priority;
private final boolean isAsynchronous;
private final Class<? extends HandlerInvocation> invocation;
private final Enveloped envelope;
private final Invoke invocationMode;
private final List<Class<?>> handledMessages = new LinkedList<Class<?>>();
private final boolean isEnvelope;
private final Class[] handledMessages;
private final boolean acceptsSubtypes;
@ -34,29 +84,40 @@ public class MessageHandler {
private final boolean isSynchronized;
private Class listeningClass;
public MessageHandler(Map<String, Object> properties){
super();
validate(properties);
this.handler = (Method)properties.get(Properties.HandlerMethod);
this.filter = (IMessageFilter[])properties.get(Properties.Filter);
this.priority = (Integer)properties.get(Properties.Priority);
this.invocation = (Class<? extends HandlerInvocation>)properties.get(Properties.Invocation);
this.invocationMode = (Invoke)properties.get(Properties.InvocationMode);
this.isEnvelope = (Boolean)properties.get(Properties.Enveloped);
this.acceptsSubtypes = (Boolean)properties.get(Properties.AcceptSubtypes);
this.listenerConfig = (MessageListener)properties.get(Properties.Listener);
this.isSynchronized = (Boolean)properties.get(Properties.IsSynchronized);
this.handledMessages = (Class[])properties.get(Properties.HandledMessages);
}
private void validate(Map<String, Object> properties){
Object[][] expectedProperties = new Object[][]{
new Object[]{Properties.HandlerMethod, Method.class },
new Object[]{Properties.Priority, Integer.class },
new Object[]{Properties.Invocation, Class.class },
new Object[]{Properties.Filter, IMessageFilter[].class },
new Object[]{Properties.Enveloped, Boolean.class },
new Object[]{Properties.HandledMessages, Class[].class },
new Object[]{Properties.IsSynchronized, Boolean.class },
new Object[]{Properties.Listener, MessageListener.class },
new Object[]{Properties.AcceptSubtypes, Boolean.class }
};
for(Object[] property : expectedProperties){
if (properties.get(property[0]) == null || !((Class)property[1]).isAssignableFrom(properties.get(property[0]).getClass()))
throw new IllegalArgumentException("Property " + property[0] + " was expected to be not null and of type " + property[1]
+ " but was: " + properties.get(property[0]));
}
public MessageHandler(Method handler, IMessageFilter[] filter, Handler handlerConfig, MessageListener listenerConfig) {
if(handler == null || handlerConfig == null){
throw new IllegalArgumentException("The message handler configuration may not be null");
}
this.handler = handler;
this.filter = filter;
this.handlerConfig = handlerConfig;
this.isAsynchronous = handlerConfig.delivery().equals(Invoke.Asynchronously);
this.envelope = handler.getAnnotation(Enveloped.class);
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
this.listenerConfig = listenerConfig;
this.isSynchronized = handler.getAnnotation(Synchronized.class) != null;
if (this.envelope != null) {
for(Class messageType : envelope.messages()){
handledMessages.add(messageType);
}
} else {
handledMessages.add(handler.getParameterTypes()[0]);
}
this.handler.setAccessible(true);
}
public boolean isSynchronized(){
@ -72,15 +133,15 @@ public class MessageHandler {
}
public boolean isAsynchronous() {
return isAsynchronous;
return invocationMode.equals(Invoke.Asynchronously);
}
public boolean isFiltered() {
return filter != null && filter.length > 0;
return filter.length > 0;
}
public int getPriority() {
return handlerConfig.priority();
return priority;
}
public Method getHandler() {
@ -91,16 +152,16 @@ public class MessageHandler {
return filter;
}
public List<Class<?>> getHandledMessages() {
public Class[] getHandledMessages() {
return handledMessages;
}
public boolean isEnveloped() {
return envelope != null;
return isEnvelope;
}
public Class<? extends HandlerInvocation> getHandlerInvocation(){
return handlerConfig.invocation();
return invocation;
}
public boolean handlesMessage(Class<?> messageType) {
@ -119,8 +180,4 @@ public class MessageHandler {
return acceptsSubtypes;
}
public boolean isEnabled() {
return handlerConfig.enabled();
}
}

View File

@ -76,8 +76,9 @@ public class MetadataReader {
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
// if a handler is overwritten it inherits the configuration of its parent method
MessageHandler handlerMetadata = new MessageHandler(overriddenHandler == null ? handler : overriddenHandler,
getFilter(handlerConfig), handlerConfig, listenerMetadata);
Map<String, Object> handlerProperties = MessageHandler.Properties.Create(overriddenHandler == null ? handler : overriddenHandler,
handlerConfig, getFilter(handlerConfig), listenerMetadata);
MessageHandler handlerMetadata = new MessageHandler(handlerProperties);
listenerMetadata.addHandler(handlerMetadata);
}

View File

@ -5,7 +5,6 @@ import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
/**
@ -64,7 +63,7 @@ public class Subscription {
return context.getHandlerMetadata().handlesMessage(messageType);
}
public List<Class<?>> getHandledMessageTypes(){
public Class[] getHandledMessageTypes(){
return context.getHandlerMetadata().getHandledMessages();
}

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.RuntimeProvider;
import net.engio.mbassy.bus.common.RuntimeProvider;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.listener.MessageHandler;
import java.util.Collection;

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.MessageBusException;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.*;
@ -18,11 +18,9 @@ import java.util.Collection;
*/
public class SubscriptionFactory {
private static final String ErrorHandlers = "error.handlers";
public Subscription createSubscription(BusRuntime runtime, MessageHandler handlerMetadata) throws MessageBusException{
try {
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(ErrorHandlers);
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(BusRuntime.Properties.ErrorHandlers);
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);

View File

@ -21,7 +21,8 @@ import org.junit.runners.Suite;
MethodDispatchTest.class,
DeadMessageTest.class,
SynchronizedHandlerTest.class,
SubscriptionManagerTest.class
SubscriptionManagerTest.class,
AsyncFIFOBusTest.class
})
public class AllTests {
}

View File

@ -0,0 +1,162 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
/**
*
* @author bennidi
* Date: 3/30/14
*/
public class AsyncFIFOBusTest extends MessageBusTest {
@Test
public void testSingleThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
List<SyncListener> listeners = new LinkedList<SyncListener>();
for(int i = 0; i < 1000 ; i++){
SyncListener listener = new SyncListener();
listeners.add(listener);
fifoBUs.subscribe(listener);
}
// prepare set of messages in increasing order
int[] messages = new int[1000];
for(int i = 0; i < messages.length ; i++){
messages[i] = i;
}
// publish in ascending order
for(Integer message : messages)
fifoBUs.post(message).asynchronously();
while(fifoBUs.hasPendingMessages())
pause(1000);
for(SyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
for(int i=0; i < messages.length; i++){
assertEquals(messages[i], listener.receivedSync.get(i));
}
}
}
// NOTE: Can fail due to timing issues.
@Test
public void testSingleThreadedSyncAsyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
List<SyncAsyncListener> listeners = new LinkedList<SyncAsyncListener>();
for(int i = 0; i < 1000 ; i++){
SyncAsyncListener listener = new SyncAsyncListener();
listeners.add(listener);
fifoBUs.subscribe(listener);
}
// prepare set of messages in increasing order
int[] messages = new int[1000];
for(int i = 0; i < messages.length ; i++){
messages[i] = i;
}
// publish in ascending order
for(Integer message : messages)
fifoBUs.post(message).asynchronously();
while(fifoBUs.hasPendingMessages())
pause(2000);
for(SyncAsyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
assertEquals(listener.receivedSync.size(), listener.receivedAsync.size());
for(int i=0; i < listener.receivedAsync.size(); i++){
assertEquals(messages[i], listener.receivedSync.get(i));
// sync and async in same order
assertEquals(listener.receivedSync.get(i), listener.receivedAsync.get(i));
}
}
}
/*
@Test
public void testMultiThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners
final IMessageBus fifoBUs = BusFactory.AsynchronousSequentialFIFO();
List<SyncListener> listeners = new LinkedList<SyncListener>();
for(int i = 0; i < 1000 ; i++){
SyncListener listener = new SyncListener();
listeners.add(listener);
fifoBUs.subscribe(listener);
}
// prepare set of messages in increasing order
final int[] messages = new int[10000];
for(int i = 0; i < messages.length ; i++){
messages[i] = i;
}
final AtomicInteger messageIndex = new AtomicInteger(0);
// publish in ascending order
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
int idx;
while((idx = messageIndex.getAndIncrement()) < messages.length){
fifoBUs.post(messages[idx]).asynchronously();
}
}
}, 5);
while(fifoBUs.hasPendingMessages())
pause(1000);
for(SyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
for(int i=0; i < messages.length; i++){
assertEquals(messages[i], listener.receivedSync.get(i));
}
}
} */
public static class SyncListener {
private List<Integer> receivedSync = new LinkedList<Integer>();
@Handler
public void handleSync(Integer message){
receivedSync.add(message);
}
}
public static class SyncAsyncListener {
private List<Integer> receivedSync = new LinkedList<Integer>();
private List<Integer> receivedAsync = new LinkedList<Integer>();
@Handler
public void handleSync(Integer message){
receivedSync.add(message);
}
@Handler(delivery = Invoke.Asynchronously)
public void handleASync(Integer message){
receivedAsync.add(message);
}
}
}

View File

@ -1,11 +1,14 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.*;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.listeners.ObjectListener;
import org.junit.Before;

View File

@ -1,8 +1,12 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.*;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;

View File

@ -1,7 +1,9 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.messages.MessageTypes;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;

View File

@ -214,8 +214,8 @@ public class SubscriptionManagerTest extends AssertSupport {
private BusRuntime mockedRuntime(){
return new BusRuntime(null)
.add("error.handlers", Collections.EMPTY_SET)
.add("handler.async-service", null);
.add(BusRuntime.Properties.ErrorHandlers, Collections.EMPTY_SET)
.add(BusRuntime.Properties.AsynchronousHandlerExecutor, null);
}
private ListenerFactory listeners(Class ...listeners){

View File

@ -1,10 +1,11 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.SyncMessageBus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.SyncBusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
@ -17,7 +18,6 @@ import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import org.junit.Assert;
import org.junit.Test;
@ -188,7 +188,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new SyncMessageBus(new SyncBusConfiguration());
return BusFactory.SynchronousOnly();
}
}

View File

@ -1,8 +1,8 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
@ -66,6 +66,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
bus.post(new Object()).asynchronously();
}
// TODO: wait for publication
pause(10000);
for(SynchronizedWithAsynchronousDelivery handler : handlers){

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.messages.MessageTypes;
import org.junit.Before;

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.common;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.PubSubSupport;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.Iterator;