refactorings, code optimizations, tests

This commit is contained in:
benni 2012-11-26 22:40:02 +01:00
parent 2718a4afb2
commit 89f44544bd
34 changed files with 814 additions and 505 deletions

View File

@ -48,6 +48,8 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
// all pending messages scheduled for asynchronous dispatch are queued here
private final LinkedBlockingQueue<SubscriptionDeliveryRequest<T>> pendingMessages = new LinkedBlockingQueue<SubscriptionDeliveryRequest<T>>();
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
// initialize the dispatch workers
@ -72,6 +74,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
}
public AbstractMessageBus() {
this(2);
}
@ -102,9 +105,9 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
if (listener == null) return false;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if (subscriptions == null) return false;
boolean isRemoved = false;
boolean isRemoved = true;
for (Subscription subscription : subscriptions) {
isRemoved = isRemoved || subscription.unsubscribe(listener);
isRemoved = isRemoved && subscription.unsubscribe(listener);
}
return isRemoved;
}
@ -165,24 +168,15 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : getSuperclasses(messageType)) {
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
if (subscriptionsPerMessage.get(eventSuperType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
// IMPROVEMENT: use tree list that sorts during insertion
//Collections.sort(subscriptions, new SubscriptionByPriorityDesc());
return subscriptions;
}
private Collection<Class> getSuperclasses(Class from) {
Collection<Class> superclasses = new LinkedList<Class>();
while (!from.equals(Object.class)) {
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
return superclasses;
}
// associate a suscription with a message type
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {

View File

@ -7,8 +7,8 @@ import java.util.concurrent.Executor;
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for each concrete message publication.
* A message publication is the publication of any message using one of the bus' publish(..) methods.
* The dispatch mechanism can by controlled for per message handler and message publication.
* A message publication is the publication of any message using one of the bus' publication methods.
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
@ -22,19 +22,22 @@ import java.util.concurrent.Executor;
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched.
* <p/>
* Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
* class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* Generally message handlers will be invoked in inverse sequence of subscription but any
* client using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the subscribed message handlers.
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
* a message handler may define filters to narrow the set of messages that it accepts.
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
* Any message listener may only be subscribed once -> subsequent subscriptions of an already subscribed message listener
* will be silently ignored)
* <p/>
* Removing a listener means removing all subscribed message handlers of that object. This remove operation
* immediately takes effect and on all running dispatch processes. A removed listener (a listener
* Removing a listener (unsubscribing) means removing all subscribed message handlers of that listener. This remove operation
* immediately takes effect and on all running dispatch processes -> A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
* after the remove operation completed.
*
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
@ -55,11 +58,12 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
/**
* Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the messageHandler was still subscribed).
* A call to this method passing null, an already subscribed message or any message that does not define any listeners
* will not have any effect.
* publications that have been published when the message listener was still subscribed).
*
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
* handlers will not have any effect and is silently ignored.
*
* @param listener
* @return true, if the listener was found and successfully removed
@ -91,14 +95,34 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*/
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
/**
* Get the executor service that is used to asynchronous message publication.
* The executor is passed to the message bus at creation time.
*
* @return
*/
public Executor getExecutor();
/**
* A post command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the functionality provided by the message bus that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*
*/
public static interface IPostCommand{
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
public void now();
/**
* Execute the message publication asynchronously. This call return immediately and all matching message handlers
* will be invoked in another thread.
*/
public void asynchronously();
}

View File

@ -1,13 +1,21 @@
package org.mbassy;
/**
* TODO. Insert class description here
* Publication error handlers are provided with a publication error every time an error occurs during message publication.
* A handler might fail with an exception, not be accessible because of the presence of a security manager
* or other reasons might lead to failures during the message publication process.
*
* <p/>
* @author bennidi
* Date: 2/22/12
*/
public interface IPublicationErrorHandler {
/**
* Handle the given publication error.
*
* @param error
*/
public void handleError(PublicationError error);
// This is the default error handler it will simply log to standard out and

View File

@ -6,7 +6,7 @@ import java.util.*;
import java.util.concurrent.*;
public class MBassador<T> extends AbstractMessageBus<T, SimplePostCommand<T>>{
public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>{
public MBassador(){
this(2);
@ -56,8 +56,8 @@ public class MBassador<T> extends AbstractMessageBus<T, SimplePostCommand<T>>{
@Override
public SimplePostCommand post(T message) {
return new SimplePostCommand(this, message);
public SyncAsyncPostCommand post(T message) {
return new SyncAsyncPostCommand(this, message);
}
}

View File

@ -1,18 +1,17 @@
package org.mbassy;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/12/12
* Time: 8:44 PM
* To change this template use File | Settings | File Templates.
*/
public class SimplePostCommand<T> implements IMessageBus.IPostCommand {
* This post command provides access to standard synchronous and asynchronous dispatch
*
* @author bennidi
* Date: 11/12/12
*/
public class SyncAsyncPostCommand<T> implements IMessageBus.IPostCommand {
private T message;
private MBassador mBassador;
public SimplePostCommand(MBassador mBassador, T message) {
public SyncAsyncPostCommand(MBassador mBassador, T message) {
this.mBassador = mBassador;
this.message = message;
}

View File

@ -2,6 +2,7 @@ package org.mbassy.common;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@ -50,6 +51,15 @@ public class ReflectionUtils {
return filtered;
}
public static Collection<Class> getSuperclasses(Class from) {
Collection<Class> superclasses = new LinkedList<Class>();
while (!from.equals(Object.class)) {
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
return superclasses;
}
public static boolean containsOverridingMethod(List<Method> allMethods, Method methodToCheck) {
for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) return true;

View File

@ -0,0 +1,65 @@
package org.mbassy.dispatch;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.PublicationError;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 11/23/12
*/
public class AbstractHandlerInvocation {
private MessagingContext context;
protected void handlePublicationError(PublicationError error){
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
for(IPublicationErrorHandler handler : handlers){
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener, Method handler){
try {
handler.invoke(listener, message);
}catch(IllegalAccessException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
handler, listener, message));
}
catch(IllegalArgumentException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
}
catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
handler, listener, message));
}
catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handler, listener, message));
}
}
public AbstractHandlerInvocation(MessagingContext context) {
this.context = context;
}
public MessagingContext getContext() {
return context;
}
}

View File

@ -0,0 +1,39 @@
package org.mbassy.dispatch;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.PublicationError;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously
*
* @author bennidi
* Date: 11/23/12
*/
public class AsynchronousHandlerInvocation implements IHandlerInvocation {
private IHandlerInvocation delegate;
public AsynchronousHandlerInvocation(IHandlerInvocation delegate) {
super();
this.delegate = delegate;
}
@Override
public void invoke(final Method handler, final Object listener, final Object message) {
getContext().getOwningBus().getExecutor().execute(new Runnable() {
@Override
public void run() {
delegate.invoke(handler, listener, message);
}
});
}
@Override
public MessagingContext getContext() {
return delegate.getContext();
}
}

View File

@ -0,0 +1,57 @@
package org.mbassy.dispatch;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.listener.IMessageFilter;
/**
* A dispatcher that implements message filtering based on the filter configuration
* of the associated message handler. It will delegate message delivery to another
* message dispatcher after having performed the filtering logic.
*
* @author bennidi
* Date: 11/23/12
*/
public class FilteredMessageDispatcher implements IMessageDispatcher {
private final IMessageFilter[] filter;
private IMessageDispatcher del;
public FilteredMessageDispatcher(IMessageDispatcher dispatcher) {
this.del = dispatcher;
this.filter = dispatcher.getContext().getHandlerMetadata().getFilter();
}
private boolean passesFilter(Object message) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, getContext().getHandlerMetadata())) return false;
}
return true;
}
}
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
if(passesFilter(message)){
del.dispatch(message, listeners);
}
}
@Override
public MessagingContext getContext() {
return del.getContext();
}
@Override
public IHandlerInvocation getInvocation() {
return del.getInvocation();
}
}

View File

@ -0,0 +1,31 @@
package org.mbassy.dispatch;
import java.lang.reflect.Method;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
* message handler to process a given message.
* A handler invocation might come in different flavours and can be composed
* of various independent invocations be means of delegation (decorator pattern)
*
* @author bennidi
* Date: 11/23/12
*/
public interface IHandlerInvocation {
/**
* Invoke the message delivery logic of this handler invocation
*
* @param handler The method that represents the actual message handler logic of the listener
* @param listener The listener that will receive the message
* @param message The message to be delivered to the listener
*/
public void invoke(final Method handler, final Object listener, final Object message);
/**
* Get the messaging context associated with this invocation
* @return
*/
public MessagingContext getContext();
}

View File

@ -0,0 +1,46 @@
package org.mbassy.dispatch;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.subscription.Subscription;
/**
* A message dispatcher provides the functionality to deliver a single message
* to a set of listeners. A message dispatcher uses a message context to access
* all information necessary for the message delivery.
*
* The delivery of a single message to a single listener is responsibility of the
* handler invocation object associated with the dispatcher.
*
* Implementations if IMessageDispatcher are partially designed using decorator pattern
* such that it is possible to compose different message dispatchers to achieve more complex
* dispatch logic.
*
* @author bennidi
* Date: 11/23/12
*/
public interface IMessageDispatcher {
/**
* Delivers the given message to the given set of listeners.
* Delivery may be delayed, aborted or restricted in various ways, depending
* on the configuration of the dispatcher
*
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
public void dispatch(Object message, ConcurrentSet listeners);
/**
* Get the messaging context associated with this dispatcher
*
* @return
*/
public MessagingContext getContext();
/**
* Get the handler invocation that will be used to deliver the message to each
* listener
* @return
*/
public IHandlerInvocation getInvocation();
}

View File

@ -0,0 +1,45 @@
package org.mbassy.dispatch;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.subscription.Subscription;
import java.lang.reflect.Method;
/**
* Standard implementation for direct, unfiltered message delivery.
*
* For each message delivery, this dispatcher iterates over the listeners
* and uses the previously provided handler invocation to deliver the message
* to each listener
*
* @author bennidi
* Date: 11/23/12
*/
public class MessageDispatcher implements IMessageDispatcher {
private MessagingContext context;
private IHandlerInvocation invocation;
public MessageDispatcher(MessagingContext context, IHandlerInvocation invocation) {
this.context = context;
this.invocation = invocation;
}
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
Method handler = getContext().getHandlerMetadata().getHandler();
for(Object listener: listeners){
getInvocation().invoke(handler, listener, message);
}
}
public MessagingContext getContext() {
return context;
}
@Override
public IHandlerInvocation getInvocation() {
return invocation;
}
}

View File

@ -0,0 +1,44 @@
package org.mbassy.dispatch;
import org.mbassy.IMessageBus;
import org.mbassy.listener.MessageHandlerMetadata;
/**
* The messaging context holds all data/objects that is relevant to successfully publish
* a message within a subscription. A one-to-one relation between a subscription and
* MessagingContext holds -> a messaging context is created for each distinct subscription
* that lives inside a message bus.
*
* @author bennidi
* Date: 11/23/12
*/
public class MessagingContext {
private IMessageBus owningBus;
private MessageHandlerMetadata handlerMetadata;
public MessagingContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) {
this.owningBus = owningBus;
this.handlerMetadata = handlerMetadata;
}
/**
* Get a reference to the message bus this context belongs to
* @return
*/
public IMessageBus getOwningBus() {
return owningBus;
}
/**
* Get the meta data that specifies the characteristics of the message handler
* that is associated with this context
* @return
*/
public MessageHandlerMetadata getHandlerMetadata() {
return handlerMetadata;
}
}

View File

@ -0,0 +1,21 @@
package org.mbassy.dispatch;
import java.lang.reflect.Method;
/**
* Uses reflection to invoke a message handler for a given message.
*
* @author bennidi
* Date: 11/23/12
*/
public class ReflectiveHandlerInvocation extends AbstractHandlerInvocation implements IHandlerInvocation {
public ReflectiveHandlerInvocation(MessagingContext context) {
super(context);
}
@Override
public void invoke(final Method handler, final Object listener, final Object message) {
invokeHandler(message, listener, handler);
}
}

View File

@ -12,12 +12,12 @@ import java.lang.annotation.Target;
* to the message listener or not.
*
* <p/>
* @author benni
* @author bennidi
* Date: 2/14/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.ANNOTATION_TYPE})
public @interface Filter {
Class<? extends MessageFilter> value();
Class<? extends IMessageFilter> value();
}

View File

@ -0,0 +1,54 @@
package org.mbassy.listener;
import java.util.HashMap;
import java.util.Map;
/**
* Message filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s)
*
* NOTE: A message filter must provide either a no-arg constructor.
*
* @author bennidi
* Date: 2/8/12
*/
public interface IMessageFilter {
/**
* Evaluate the message to ensure that it matches the handler configuration
*
*
* @param message the message to be delivered
* @return
*/
public boolean accepts(Object message, MessageHandlerMetadata metadata);
public static final class All implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return true;
}
}
public static final class None implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return false;
}
}
public static final class DontAllowSubtypes implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return event.getClass().equals(metadata.getDeclaredMessageType());
}
}
}

View File

@ -3,17 +3,18 @@ package org.mbassy.listener;
import java.lang.annotation.*;
/**
* TODO. Insert class description here
* <p/>
* Mark any method of any object as a message handler and configure the handler
* using different properties.
*
* @author bennidi
* Date: 2/8/12
* Time: 3:35 PM
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Listener {
Filter[] filters() default {}; // no filters by default
Mode dispatch() default Mode.Synchronous;

View File

@ -1,40 +0,0 @@
package org.mbassy.listener;
/**
* Object filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s)
*
* @author bennidi
* Date: 2/8/12
*/
public interface MessageFilter {
/**
* Evaluate the message and listener to ensure that the message should be handled by the listener
*
*
* @param event the event to be delivered
* @param listener the listener instance that would receive the event if it passes the filter
* @return
*/
public boolean accepts(Object event, Object listener);
public static final class All implements MessageFilter {
@Override
public boolean accepts(Object event, Object listener) {
return true;
}
}
public static final class None implements MessageFilter {
@Override
public boolean accepts(Object event, Object listener) {
return false;
}
}
}

View File

@ -1,12 +1,10 @@
package org.mbassy.listener;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
import org.mbassy.listener.MessageFilter;
import java.lang.reflect.Method;
/**
*
*
* @author bennidi
* Date: 11/14/12
*/
@ -14,18 +12,19 @@ public class MessageHandlerMetadata {
private Method handler;
private MessageFilter[] filter;
private IMessageFilter[] filter;
private Listener listenerConfig;
private boolean isAsynchronous = false;
public MessageHandlerMetadata(Method handler, MessageFilter[] filter, Listener listenerConfig) {
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) {
this.handler = handler;
this.filter = filter;
this.listenerConfig = listenerConfig;
this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous);
this.handler.setAccessible(true);
}
@ -34,7 +33,7 @@ public class MessageHandlerMetadata {
}
public boolean isFiltered(){
return filter == null || filter.length == 0;
return filter != null && filter.length > 0;
}
public int getPriority(){
@ -45,7 +44,11 @@ public class MessageHandlerMetadata {
return handler;
}
public MessageFilter[] getFilter() {
public IMessageFilter[] getFilter() {
return filter;
}
public Class getDeclaredMessageType(){
return handler.getParameterTypes()[0];
}
}

View File

@ -28,15 +28,15 @@ public class MetadataReader {
};
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
// retrieve all instances of filters associated with the given subscription
private MessageFilter[] getFilter(Listener subscription) throws Exception{
private IMessageFilter[] getFilter(Listener subscription) throws Exception{
if (subscription.filters().length == 0) return null;
MessageFilter[] filters = new MessageFilter[subscription.filters().length];
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
int i = 0;
for (Filter filterDef : subscription.filters()) {
MessageFilter filter = filterCache.get(filterDef.value());
IMessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
@ -51,7 +51,7 @@ public class MetadataReader {
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) throws Exception{
Listener config = messageHandler.getAnnotation(Listener.class);
MessageFilter[] filter = getFilter(config);
IMessageFilter[] filter = getFilter(config);
return new MessageHandlerMetadata(messageHandler, filter, config);
}

View File

@ -1,33 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:50 PM
* To change this template use File | Settings | File Templates.
*/
public class FilteredAsynchronousSubscription extends FilteredSubscription{
public FilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
getMessageBus().getExecutor().execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}

View File

@ -1,53 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:48 PM
* To change this template use File | Settings | File Templates.
*/
public abstract class FilteredSubscription extends Subscription{
private final MessageFilter[] filter;
public FilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
this.filter = messageHandler.getFilter();
}
private boolean passesFilter(Object message, Object listener) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, listener)) return false;
}
return true;
}
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
if(passesFilter(message, listener)) {
dispatch(message, listener);
}
}
}
}

View File

@ -1,28 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:49 PM
* To change this template use File | Settings | File Templates.
*/
public class FilteredSynchronousSubscription extends FilteredSubscription {
public FilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}

View File

@ -4,8 +4,11 @@ import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.PublicationError;
import org.mbassy.dispatch.IMessageDispatcher;
import org.mbassy.dispatch.MessagingContext;
import org.mbassy.listener.MessageHandlerMetadata;
import javax.xml.ws.handler.MessageContext;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
@ -13,81 +16,39 @@ import java.util.Comparator;
import java.util.UUID;
/**
* Subscription is a thread safe container for objects that contain message handlers
* A subscription is a thread safe container for objects that contain message handlers
*/
public abstract class Subscription {
public class Subscription {
private UUID id = UUID.randomUUID();
private final Method handler;
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
private int priority = 0;
private IMessageDispatcher dispatcher;
private IMessageBus owningBus ;
private MessagingContext context;
public Subscription(IMessageBus owningBus, MessageHandlerMetadata messageHandler) {
this.owningBus = owningBus;
this.priority = messageHandler.getPriority();
this.handler = messageHandler.getHandler();
this.handler.setAccessible(true);
public Subscription(MessagingContext context, IMessageDispatcher dispatcher) {
this.context = context;
this.dispatcher = dispatcher;
}
public abstract void publish(Object message);
protected abstract void dispatch(final Object message, final Object listener);
public void publish(Object message){
dispatcher.dispatch(message, listeners);
}
protected IMessageBus getMessageBus(){
return owningBus;
public MessagingContext getContext(){
return context;
}
public int getPriority(){
return priority;
return context.getHandlerMetadata().getPriority();
}
public void subscribe(Object o) {
listeners.add(o);
}
protected void handlePublicationError(PublicationError error){
Collection<IPublicationErrorHandler> handlers = owningBus.getRegisteredErrorHandlers();
for(IPublicationErrorHandler handler : handlers){
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener){
try {
handler.invoke(listener, message);
}catch(IllegalAccessException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
handler, listener, message));
}
catch(IllegalArgumentException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
}
catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
handler, listener, message));
}
catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handler, listener, message));
}
}
@ -95,6 +56,10 @@ public abstract class Subscription {
return listeners.remove(existingListener);
}
public int size(){
return listeners.size();
}
public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
@Override

View File

@ -2,6 +2,7 @@ package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.dispatch.*;
import org.mbassy.listener.MessageHandlerMetadata;
import java.util.Collection;
@ -17,27 +18,30 @@ public class SubscriptionFactory {
private IMessageBus owner;
public SubscriptionFactory(IMessageBus owner) {
this.owner = owner;
}
public Subscription createSubscription(MessageHandlerMetadata messageHandlerMetadata){
if(messageHandlerMetadata.isFiltered()){
if(messageHandlerMetadata.isAsynchronous()){
return new UnfilteredAsynchronousSubscription(owner, messageHandlerMetadata);
}
else{
return new UnfilteredSynchronousSubscription(owner, messageHandlerMetadata);
}
}
else{
if(messageHandlerMetadata.isAsynchronous()){
return new FilteredAsynchronousSubscription(owner, messageHandlerMetadata);
}
else{
return new FilteredSynchronousSubscription(owner, messageHandlerMetadata);
}
MessagingContext context = new MessagingContext(owner, messageHandlerMetadata);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher);
}
protected IHandlerInvocation buildInvocationForHandler(MessagingContext context){
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context);
if(context.getHandlerMetadata().isAsynchronous()){
invocation = new AsynchronousHandlerInvocation(invocation);
}
return invocation;
}
protected IMessageDispatcher buildDispatcher(MessagingContext context, IHandlerInvocation invocation){
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
if(context.getHandlerMetadata().isFiltered()){
dispatcher = new FilteredMessageDispatcher(dispatcher);
}
return dispatcher;
}
}

View File

@ -1,33 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:48 PM
* To change this template use File | Settings | File Templates.
*/
public class UnfilteredAsynchronousSubscription extends UnfilteredSubscription {
public UnfilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
getMessageBus().getExecutor().execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}

View File

@ -1,34 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:45 PM
* To change this template use File | Settings | File Templates.
*/
public abstract class UnfilteredSubscription extends Subscription{
public UnfilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
dispatch(message, listener);
}
}
}

View File

@ -1,27 +0,0 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/14/12
* Time: 3:49 PM
* To change this template use File | Settings | File Templates.
*/
public class UnfilteredSynchronousSubscription extends UnfilteredSubscription{
public UnfilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}

View File

@ -0,0 +1,19 @@
package org.mbassy;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
* Test suite for running all available unit tests
*
* @author bennidi
* Date: 11/23/12
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
ConcurrentSetTest.class,
MBassadorTest.class,
FilterTest.class
})
public class AllTests {
}

View File

@ -9,13 +9,17 @@ import java.util.LinkedList;
import java.util.Random;
/**
* Created with IntelliJ IDEA.
* This test ensures the correct behaviour of the set implementation that is the building
* block of the subscription implementations used by the Mbassador message bus.
*
* It should behave exactly like other set implementations do and as such all tests are based
* on comparing the outcome of sequence of operations applied to a standard set implementation
* and the concurrent set.
*
* @author bennidi
* Date: 11/12/12
* Time: 3:02 PM
* To change this template use File | Settings | File Templates.
* Date: 11/12/12
*/
public class ConcurrentSetTest extends UnitTest{
public class ConcurrentSetTest extends UnitTest {
private int numberOfElements = 100000;
@ -23,27 +27,27 @@ public class ConcurrentSetTest extends UnitTest{
@Test
public void testIterator(){
final HashSet<Object> distinct = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
public void testIteratorCleanup() {
final HashSet<Object> persistingCandidates = new HashSet<Object>();
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
Random rand = new Random();
for(int i=0;i < numberOfElements ; i++){
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
if(rand.nextInt() % 3 == 0){
distinct.add(candidate);
if (rand.nextInt() % 3 == 0) {
persistingCandidates.add(candidate);
}
target.add(candidate);
testSet.add(candidate);
}
// this will remove all objects that have not been inserted into the set of persisting candidates
runGC();
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : target){
for (Object testObject : testSet) {
// do nothing
// just iterate to trigger automatic clean up
System.currentTimeMillis();
@ -51,139 +55,165 @@ public class ConcurrentSetTest extends UnitTest{
}
}, numberOfThreads);
for(Object tar : target){
Assert.assertTrue(distinct.contains(tar));
assertEquals(persistingCandidates.size(), testSet.size());
for (Object test : testSet) {
assertTrue(persistingCandidates.contains(test));
}
}
@Test
public void testInsert(){
@Test
public void testUniqueness() {
final LinkedList<Object> duplicates = new LinkedList<Object>();
final HashSet<Object> distinct = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
Random rand = new Random();
// build set of distinct objects and list of duplicates
Object candidate = new Object();
for(int i=0;i < numberOfElements ; i++){
if(rand.nextInt() % 3 == 0){
for (int i = 0; i < numberOfElements; i++) {
if (rand.nextInt() % 3 == 0) {
candidate = new Object();
}
duplicates.add(candidate);
distinct.add(candidate);
}
// insert all elements (containing duplicates) into the set
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : duplicates){
target.add(src);
for (Object src : duplicates) {
testSet.add(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
Assert.assertTrue(distinct.contains(tar));
// check that the control set and the test set contain the exact same elements
assertEquals(distinct.size(), testSet.size());
for (Object uniqueObject : distinct) {
assertTrue(testSet.contains(uniqueObject));
}
for(Object src : distinct){
Assert.assertTrue(target.contains(src));
}
@Test
public void testPerformance(){
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> hashSet = new HashSet<Object>();
final ConcurrentSet<Object> concurrentSet = new ConcurrentSet<Object>();
for (int i = 0; i < 1000000; i++) {
source.add(new Object());
}
Assert.assertEquals(distinct.size(), target.size());
long start = System.currentTimeMillis();
for(Object o: source){
hashSet.add(o);
}
long duration = System.currentTimeMillis() - start;
System.out.println("Performance of HashSet for 1.000.000 object insertions " + duration);
start = System.currentTimeMillis();
for(Object o: source){
concurrentSet.add(o);
}
duration = System.currentTimeMillis() - start;
System.out.println("Performance of ConcurrentSet for 1.000.000 object insertions " + duration);
}
@Test
public void testRemove1(){
public void testRemove2() {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
for(int i=0;i < numberOfElements ; i++){
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
// build set of distinct objects and mark a subset of those for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
if(i % 3 == 0){
if (i % 3 == 0) {
toRemove.add(candidate);
}
}
// build the test set from the set of candidates
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : source){
target.add(src);
for (Object src : source) {
testSet.add(src);
}
}
}, numberOfThreads);
// remove all candidates that have previously been marked for removal from the test set
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : toRemove){
target.remove(src);
for (Object src : toRemove) {
testSet.remove(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar));
}
for(Object src : source){
if(!toRemove.contains(src))Assert.assertTrue(target.contains(src));
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
}
}
@Test
public void testRemove2(){
public void testRemoval() {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
for(int i=0;i < numberOfElements ; i++){
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
// build set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
if(i % 3 == 0){
if (i % 3 == 0) {
toRemove.add(candidate);
}
}
// build test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : source){
target.add(src);
if(toRemove.contains(src))
target.remove(src);
for (Object src : source) {
testSet.add(src);
if (toRemove.contains(src))
testSet.remove(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar));
}
for(Object src : source){
if(!toRemove.contains(src))Assert.assertTrue(target.contains(src));
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
}
}

View File

@ -0,0 +1,57 @@
package org.mbassy;
import org.junit.Test;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Filter;
import org.mbassy.listener.IMessageFilter;
import org.mbassy.listener.Listener;
import org.mbassy.listeners.*;
import java.util.List;
/**
* Testing of filter functionality
*
* @author bennidi
* Date: 11/26/12
*/
public class FilterTest extends UnitTest{
@Test
public void testSubclassFilter() throws Exception {
MBassador bus = new MBassador();
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subTestEvent = new SubTestEvent();
bus.post(event).now();
bus.post(subTestEvent).now();
assertEquals(100, event.counter.get());
assertEquals(0, subTestEvent.counter.get());
}
public static class FilteredMessageListener{
@Listener(filters = {@Filter(IMessageFilter.DontAllowSubtypes.class)})
public void handleTestEvent(TestEvent event){
event.counter.incrementAndGet();
}
}
}

View File

@ -4,8 +4,11 @@ import org.junit.Test;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listeners.*;
import org.mbassy.subscription.Subscription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -17,27 +20,62 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public class MBassadorTest extends UnitTest {
// this is a single threaded test for subscribing and unsubscribing of a single listener
@Test
public void testSubscribeSimple() throws InterruptedException {
MBassador bus = new MBassador();
List<Object> listeners = new LinkedList<Object>();
int listenerCount = 1000;
// subscribe a number of listeners to the bus
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean listener = new EventingTestBean();
NonListeningBean nonListener = new NonListeningBean();
listeners.add(listener);
bus.subscribe(listener);
bus.subscribe(nonListener);
assertTrue(bus.unsubscribe(listener));
assertFalse(bus.unsubscribe(nonListener));
assertFalse(bus.unsubscribe(nonListener)); // these are not expected to be subscribed listeners
assertFalse(bus.unsubscribe(new EventingTestBean()));
}
// check the generated subscriptions for existence of all previously subscribed valid listeners
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
assertEquals(1, testEventsubscriptions.size());
assertEquals(listenerCount, getNumberOfSubscribedListeners(testEventsubscriptions));
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
assertEquals(3, subTestEventsubscriptions.size());
assertEquals(3 * listenerCount, getNumberOfSubscribedListeners(subTestEventsubscriptions));
// unsubscribe the listeners
for(Object listener : listeners){
assertTrue(bus.unsubscribe(listener)); // this listener is expected to exist
}
// no listener should be left
testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
assertEquals(1, testEventsubscriptions.size());
assertEquals(0, getNumberOfSubscribedListeners(testEventsubscriptions));
subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
assertEquals(3, subTestEventsubscriptions.size());
assertEquals(0, getNumberOfSubscribedListeners(subTestEventsubscriptions));
}
private int getNumberOfSubscribedListeners(Collection<Subscription> subscriptions) {
int listeners = 0;
for (Subscription sub : subscriptions) {
listeners += sub.size();
}
return listeners;
}
@Test
public void testSubscribeConcurrent() throws Exception {
public void testConcurrentSubscription() throws Exception {
MBassador bus = new MBassador();
ListenerFactory listenerFactory = new ListenerFactory()
@ -48,34 +86,37 @@ public class MBassadorTest extends UnitTest {
.create(100, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
// check the generated subscriptions for existence of all previously subscribed valid listeners
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
assertEquals(3, testEventsubscriptions.size());
assertEquals(300, getNumberOfSubscribedListeners(testEventsubscriptions));
bus.publish(event);
bus.publish(subEvent);
pause(4000);
assertEquals(300, event.counter.get());
assertEquals(700, subEvent.counter.get());
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
assertEquals(10, subTestEventsubscriptions.size());
assertEquals(1000, getNumberOfSubscribedListeners(subTestEventsubscriptions));
}
@Test
public void testAsynchronous() throws InterruptedException {
public void testAsynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador();
int listenerCount = 1000;
List<EventingTestBean> persistentReferences = new ArrayList();
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
.create(100, EventingTestBean3.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
@ -85,64 +126,64 @@ public class MBassadorTest extends UnitTest {
pause(2000);
assertTrue(event.counter.get() == 1000);
assertTrue(subEvent.counter.get() == 1000 * 2);
assertEquals(300, event.counter.get());
assertEquals(700, subEvent.counter.get());
}
@Test
public void testSynchronous() throws InterruptedException {
public void testSynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador();
int listenerCount = 10;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
.create(100, EventingTestBean3.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publish(event);
bus.publish(subEvent);
bus.publish(event);
bus.publish(subEvent);
assertEquals(i, event.counter.get());
pause(2000);
pause(50);
assertEquals(i * 2, subEvent.counter.get());
}
assertEquals(300, event.counter.get());
assertEquals(700, subEvent.counter.get());
}
@Test
public void testConcurrentPublication() {
final MBassador bus = new MBassador();
final int listenerCount = 100;
final int concurrency = 20;
public void testConcurrentMixedMessagePublication() throws Exception {
final CopyOnWriteArrayList<TestEvent> testEvents = new CopyOnWriteArrayList<TestEvent>();
final CopyOnWriteArrayList<SubTestEvent> subtestEvents = new CopyOnWriteArrayList<SubTestEvent>();
final CopyOnWriteArrayList<EventingTestBean> persistentReferences = new CopyOnWriteArrayList<EventingTestBean>();
final int eventLoopsPerTHread = 100;
final MBassador bus = new MBassador();
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
.create(100, EventingTestBean3.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for (int i = 0; i < listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
}
}, concurrency);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for (int i = 0; i < listenerCount; i++) {
for (int i = 0; i < eventLoopsPerTHread; i++) {
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
testEvents.add(event);
@ -152,16 +193,16 @@ public class MBassadorTest extends UnitTest {
bus.publish(subEvent);
}
}
}, concurrency);
}, 10);
pause(3000);
for (TestEvent event : testEvents) {
assertEquals(listenerCount * concurrency, event.counter.get());
assertEquals(300, event.counter.get());
}
for (SubTestEvent event : subtestEvents) {
assertEquals(listenerCount * concurrency * 2, event.counter.get());
assertEquals(700, event.counter.get());
}
}

View File

@ -3,8 +3,8 @@ package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Filter;
import org.mbassy.listener.IMessageFilter;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.Mode;
/**
@ -33,7 +33,7 @@ public class EventingTestBean {
@Listener(
priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(MessageFilter.None.class), @Filter(MessageFilter.All.class)})
filters = {@Filter(IMessageFilter.None.class), @Filter(IMessageFilter.All.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
}

View File

@ -7,7 +7,7 @@ import org.mbassy.listener.Mode;
/**
* This bean overrides all the handlers defined in its superclass. Since it does not specify any annotations
* it should be considered a message lister
* it should not be considered a message listener
*
* @author bennidi
* Date: 11/22/12