Simplifying/pruning mbassador functionality and complexity

This commit is contained in:
nathan 2015-02-03 18:49:43 +01:00
parent f509bfe57f
commit d17183d592
53 changed files with 543 additions and 2141 deletions

View File

@ -34,7 +34,5 @@ public interface IMessagePublication {
public boolean isDeadEvent();
public boolean isFilteredEvent();
public Object getMessage();
}

View File

@ -1,11 +1,10 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription;
/**
* A message publication is created for each asynchronous message dispatch. It reflects the state
* of the corresponding message publication process, i.e. provides information whether the
@ -33,64 +32,66 @@ public class MessagePublication implements IMessagePublication {
this.state = initialState;
}
@Override
public boolean add(Subscription subscription) {
return subscriptions.add(subscription);
return this.subscriptions.add(subscription);
}
/*
TODO: document state transitions
*/
@Override
public void execute() {
state = State.Running;
for (Subscription sub : subscriptions) {
sub.publish(this, message);
this.state = State.Running;
for (Subscription sub : this.subscriptions) {
sub.publish(this, this.message);
}
state = State.Finished;
this.state = State.Finished;
// if the message has not been marked delivered by the dispatcher
if (!delivered) {
if (!isFilteredEvent() && !isDeadEvent()) {
runtime.getProvider().publish(new FilteredMessage(message));
} else if (!isDeadEvent()) {
runtime.getProvider().publish(new DeadMessage(message));
if (!this.delivered) {
if (!isDeadEvent()) {
this.runtime.getProvider().publish(new DeadMessage(this.message));
}
}
}
@Override
public boolean isFinished() {
return state.equals(State.Finished);
return this.state.equals(State.Finished);
}
@Override
public boolean isRunning() {
return state.equals(State.Running);
return this.state.equals(State.Running);
}
@Override
public boolean isScheduled() {
return state.equals(State.Scheduled);
return this.state.equals(State.Scheduled);
}
@Override
public void markDelivered() {
delivered = true;
this.delivered = true;
}
@Override
public MessagePublication markScheduled() {
if (state.equals(State.Initial)) {
state = State.Scheduled;
if (this.state.equals(State.Initial)) {
this.state = State.Scheduled;
}
return this;
}
@Override
public boolean isDeadEvent() {
return DeadMessage.class.equals(message.getClass());
}
public boolean isFilteredEvent() {
return FilteredMessage.class.equals(message.getClass());
return DeadMessage.class.equals(this.message.getClass());
}
@Override
public Object getMessage() {
return message;
return this.message;
}
private enum State {

View File

@ -1,17 +0,0 @@
package net.engio.mbassy.bus.common;
/**
* A filtered message event is published when there have been matching subscriptions for a given
* message publication but configured filters prevented the message from being delivered to
* any of the handlers.
*
* @author bennidi
* Date: 3/1/13
*/
public final class FilteredMessage extends PublicationEvent {
public FilteredMessage(Object event) {
super(event);
}
}

View File

@ -1,14 +0,0 @@
package net.engio.mbassy.common;
/**
* Created with IntelliJ IDEA.
*
* @author bennidi
* Date: 10/22/12
* Time: 9:33 AM
* To change this template use File | Settings | File Templates.
*/
public interface IPredicate<T> {
boolean apply(T target);
}

View File

@ -8,6 +8,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import net.engio.mbassy.listener.Handler;
/**
* @author bennidi
* Date: 2/16/12
@ -16,45 +18,45 @@ import java.util.Set;
public class ReflectionUtils
{
public static List<Method> getMethods( IPredicate<Method> condition, Class<?> target ) {
List<Method> methods = new LinkedList<Method>();
try {
for ( Method method : target.getDeclaredMethods() ) {
if ( condition.apply( method ) ) {
methods.add( method );
}
}
}
catch ( Exception e ) {
//nop
}
if ( !target.equals( Object.class ) ) {
methods.addAll( getMethods( condition, target.getSuperclass() ) );
}
return methods;
}
// modified by dorkbox, llc 2015
public static List<Method> getMethods(Class<?> target) {
List<Method> methods = new LinkedList<Method>();
try {
for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) {
methods.add(method);
}
}
} catch (Exception ignored) {
}
/**
* Traverses the class hierarchy upwards, starting at the given subclass, looking
* for an override of the given methods -> finds the bottom most override of the given
* method if any exists
*
* @param overridingMethod
* @param subclass
* @return
*/
public static Method getOverridingMethod( final Method overridingMethod, final Class subclass ) {
Class current = subclass;
while ( !current.equals( overridingMethod.getDeclaringClass() ) ) {
try {
return current.getDeclaredMethod( overridingMethod.getName(), overridingMethod.getParameterTypes() );
}
catch ( NoSuchMethodException e ) {
current = current.getSuperclass();
}
}
return null;
}
if (!target.equals(Object.class)) {
methods.addAll(getMethods(target.getSuperclass()));
}
return methods;
}
/**
* Traverses the class hierarchy upwards, starting at the given subclass, looking
* for an override of the given methods -> finds the bottom most override of the given
* method if any exists
*
* @param overridingMethod
* @param subclass
* @return
*/
public static Method getOverridingMethod( final Method overridingMethod, final Class<?> subclass ) {
Class<?> current = subclass;
while ( !current.equals( overridingMethod.getDeclaringClass() ) ) {
try {
return current.getDeclaredMethod( overridingMethod.getName(), overridingMethod.getParameterTypes() );
}
catch ( NoSuchMethodException e ) {
current = current.getSuperclass();
}
}
return null;
}
/**
* Collect all directly and indirectly related super types (classes and interfaces) of
@ -63,48 +65,53 @@ public class ReflectionUtils
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static Set<Class> getSuperTypes(Class from) {
Set<Class> superclasses = new HashSet<Class>();
collectInterfaces( from, superclasses );
while ( !from.equals( Object.class ) && !from.isInterface() ) {
superclasses.add( from.getSuperclass() );
from = from.getSuperclass();
collectInterfaces( from, superclasses );
}
return superclasses;
}
public static Set<Class<?>> getSuperTypes(Class<?> from) {
Set<Class<?>> superclasses = new HashSet<Class<?>>();
collectInterfaces( from, superclasses );
while ( !from.equals( Object.class ) && !from.isInterface() ) {
superclasses.add( from.getSuperclass() );
from = from.getSuperclass();
collectInterfaces( from, superclasses );
}
return superclasses;
}
public static void collectInterfaces( Class from, Set<Class> accumulator ) {
for ( Class intface : from.getInterfaces() ) {
accumulator.add( intface );
collectInterfaces( intface, accumulator );
}
}
public static void collectInterfaces( Class<?> from, Set<Class<?>> accumulator ) {
for ( Class<?> intface : from.getInterfaces() ) {
accumulator.add( intface );
collectInterfaces( intface, accumulator );
}
}
public static boolean containsOverridingMethod( final List<Method> allMethods, final Method methodToCheck ) {
for ( Method method : allMethods ) {
if ( isOverriddenBy( methodToCheck, method ) ) {
return true;
}
}
return false;
}
//
public static boolean containsOverridingMethod(final List<Method> allMethods, final Method methodToCheck) {
for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) {
return true;
}
}
return false;
}
/**
* Searches for an Annotation of the given type on the class. Supports meta annotations.
*
* @param from AnnotatedElement (class, method...)
* @param annotationType Annotation class to look for.
* @param <A> Class of annotation type
* @return Annotation instance or null
*/
private static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType, Set<AnnotatedElement> visited) {
if( visited.contains(from) ) return null;
/**
* Searches for an Annotation of the given type on the class. Supports meta annotations.
*
* @param from AnnotatedElement (class, method...)
* @param annotationType Annotation class to look for.
* @param <A> Class of annotation type
* @return Annotation instance or null
*/
private static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType, Set<AnnotatedElement> visited) {
if( visited.contains(from) ) {
return null;
}
visited.add(from);
A ann = from.getAnnotation( annotationType );
if( ann != null) return ann;
if( ann != null) {
return ann;
}
for ( Annotation metaAnn : from.getAnnotations() ) {
ann = getAnnotation(metaAnn.annotationType(), annotationType, visited);
if ( ann != null ) {
@ -112,32 +119,34 @@ public class ReflectionUtils
}
}
return null;
}
}
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){
return getAnnotation(from, annotationType, new HashSet<AnnotatedElement>());
}
private static boolean isOverriddenBy( Method superclassMethod, Method subclassMethod ) {
// if the declaring classes are the same or the subclass method is not defined in the subclass
// hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod
if ( superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass() )
//
private static boolean isOverriddenBy( Method superclassMethod, Method subclassMethod ) {
// if the declaring classes are the same or the subclass method is not defined in the subclass
// hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod
if ( superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass() )
|| !superclassMethod.getDeclaringClass().isAssignableFrom( subclassMethod.getDeclaringClass() )
|| !superclassMethod.getName().equals(subclassMethod.getName())) {
return false;
}
return false;
}
Class[] superClassMethodParameters = superclassMethod.getParameterTypes();
Class[] subClassMethodParameters = subclassMethod.getParameterTypes();
// method must specify the same number of parameters
//the parameters must occur in the exact same order
for ( int i = 0; i < subClassMethodParameters.length; i++ ) {
if ( !superClassMethodParameters[i].equals( subClassMethodParameters[i] ) ) {
return false;
}
}
return true;
}
Class<?>[] superClassMethodParameters = superclassMethod.getParameterTypes();
Class<?>[] subClassMethodParameters = subclassMethod.getParameterTypes();
// method must specify the same number of parameters
//the parameters must occur in the exact same order
for ( int i = 0; i < subClassMethodParameters.length; i++ ) {
if ( !superClassMethodParameters[i].equals( subClassMethodParameters[i] ) ) {
return false;
}
}
return true;
}
}

View File

@ -1,81 +0,0 @@
package net.engio.mbassy.common;
import java.util.HashMap;
import java.util.Iterator;
/**
* This implementation uses strong references to the elements.
* <p/>
*
* @author bennidi
* Date: 2/12/12
*/
public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T>{
public StrongConcurrentSet() {
super(new HashMap<T, ISetEntry<T>>());
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private ISetEntry<T> current = head;
public boolean hasNext() {
return current != null;
}
public T next() {
if (current == null) {
return null;
}
else {
T value = current.getValue();
current = current.next();
return value;
}
}
public void remove() {
if (current == null) {
return;
}
ISetEntry<T> newCurrent = current.next();
StrongConcurrentSet.this.remove(current.getValue());
current = newCurrent;
}
};
}
@Override
protected Entry<T> createEntry(T value, Entry<T> next) {
return next != null ? new StrongEntry<T>(value, next) : new StrongEntry<T>(value);
}
public static class StrongEntry<T> extends Entry<T> {
private T value;
private StrongEntry(T value, Entry<T> next) {
super(next);
this.value = value;
}
private StrongEntry(T value) {
super();
this.value = value;
}
@Override
public T getValue() {
return value;
}
}
}

View File

@ -1,38 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import java.util.concurrent.ExecutorService;
/**
* This invocation will schedule the wrapped (decorated) invocation to be executed asynchronously
*
* @author bennidi
* Date: 11/23/12
*/
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
private final IHandlerInvocation delegate;
private final ExecutorService executor;
public AsynchronousHandlerInvocation(IHandlerInvocation delegate) {
super(delegate.getContext());
this.delegate = delegate;
this.executor = delegate.getContext().getRuntime().get(BusRuntime.Properties.AsynchronousHandlerExecutor);
}
/**
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message){
executor.execute(new Runnable() {
@Override
public void run() {
delegate.invoke(listener, message);
}
});
}
}

View File

@ -1,26 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.MessageEnvelope;
/**
* The enveloped dispatcher will wrap published messages in an envelope before
* passing them to their configured dispatcher.
* <p/>
* All enveloped message handlers will have this dispatcher in their chain
*
* @author bennidi
* Date: 12/12/12
*/
public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher {
public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) {
super(dispatcher);
}
@Override
public void dispatch(IMessagePublication publication, Object message, Iterable listeners){
getDelegate().dispatch(publication, new MessageEnvelope(message), listeners);
}
}

View File

@ -1,45 +0,0 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.listener.IMessageFilter;
/**
* A dispatcher that implements message filtering based on the filter configuration
* of the associated message handler. It will delegate message delivery to another
* message dispatcher after having performed the filtering logic.
*
* @author bennidi
* Date: 11/23/12
*/
public final class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
private final IMessageFilter[] filter;
public FilteredMessageDispatcher(IMessageDispatcher dispatcher) {
super(dispatcher);
this.filter = dispatcher.getContext().getHandlerMetadata().getFilter();
}
private boolean passesFilter(Object message) {
if (filter == null) {
return true;
} else {
for (IMessageFilter aFilter : filter) {
if (!aFilter.accepts(message, getContext().getHandlerMetadata())) {
return false;
}
}
return true;
}
}
@Override
public void dispatch(IMessagePublication publication, Object message, Iterable listeners){
if (passesFilter(message)) {
getDelegate().dispatch(publication, message, listeners);
}
}
}

View File

@ -1,72 +0,0 @@
package net.engio.mbassy.dispatch.el;
import net.engio.mbassy.listener.IMessageFilter;
import net.engio.mbassy.listener.MessageHandler;
import javax.el.ExpressionFactory;
import javax.el.ValueExpression;
/*****************************************************************************
* A filter that will use a expression from the handler annotation and
* parse it as EL.
****************************************************************************/
public class ElFilter implements IMessageFilter {
// thread-safe initialization of EL factory singleton
public static final class ExpressionFactoryHolder{
// if runtime exception is thrown, this will
public static final ExpressionFactory ELFactory = getELFactory();
/*************************************************************************
* Get an implementation of the ExpressionFactory. This uses the
* Java service lookup mechanism to find a proper implementation.
* If none if available we do not support EL filters.
************************************************************************/
private static final ExpressionFactory getELFactory(){
try {
return ExpressionFactory.newInstance();
} catch (RuntimeException e) {
return null;
}
}
}
public static final boolean isELAvailable(){
return ExpressionFactoryHolder.ELFactory != null;
}
public static final ExpressionFactory ELFactory(){
return ExpressionFactoryHolder.ELFactory;
}
/**
* Accepts a message if the associated EL expression of the message handler resolves to 'true'
*
* @param message the message to be handled by the handler
* @param metadata the metadata object which describes the message handler
* @return
*/
@Override
public boolean accepts(Object message, MessageHandler metadata) {
String expression = metadata.getCondition();
StandardELResolutionContext context = new StandardELResolutionContext(message);
return evalExpression(expression, context);
}
private boolean evalExpression(String expression, StandardELResolutionContext context) {
ValueExpression ve = ELFactory().createValueExpression(context, expression, Boolean.class);
try{
Object result = ve.getValue(context);
return (Boolean)result;
}
catch(Throwable exception){
// TODO: BusRuntime should be available in this filter to propagate resolution errors
// -> this is generally a good feature for filters
return false;
//throw new IllegalStateException("A handler uses an EL filter but the output is not \"true\" or \"false\".");
}
}
}

View File

@ -1,92 +0,0 @@
package net.engio.mbassy.dispatch.el;
import javax.el.*;
import java.lang.reflect.Method;
/**
* This ELContext implementation provides support for standard BeanEL resolution in conditional message handlers.
* The message parameter of the message handlers is bound to 'msg' such that it can be referenced int the EL expressions.
*
* Example:
* @Handler(condition = "msg.type == 'onClick'")
* public void handle(ButtonEvent event)
*
*/
public class StandardELResolutionContext extends ELContext {
private final ELResolver resolver;
private final FunctionMapper functionMapper;
private final VariableMapper variableMapper;
private final Object message;
public StandardELResolutionContext(Object message) {
super();
this.message = message;
this.functionMapper = new NoopFunctionMapper();
this.variableMapper = new MsgMapper();
// Composite resolver not necessary as the only resolution type currently supported is standard BeanEL
//this.resolver = new CompositeELResolver();
this.resolver = new BeanELResolver(true);
}
/*************************************************************************
* The resolver for the event object.
* @see javax.el.ELContext#getELResolver()
************************************************************************/
@Override
public ELResolver getELResolver() {
return this.resolver;
}
/*************************************************************************
* @see javax.el.ELContext#getFunctionMapper()
************************************************************************/
@Override
public FunctionMapper getFunctionMapper() {
return this.functionMapper;
}
/*************************************************************************
* @see javax.el.ELContext#getVariableMapper()
************************************************************************/
@Override
public VariableMapper getVariableMapper() {
return this.variableMapper;
}
/**
* This mapper resolves the variable identifies "msg" to the message
* object of the current handler invocation
*/
private class MsgMapper extends VariableMapper {
private static final String msg = "msg";
// reuse the same expression as it always resolves to the same object
private final ValueExpression msgExpression = ElFilter.ELFactory().createValueExpression(message, message.getClass());
public ValueExpression resolveVariable(final String s) {
// resolve 'msg' to the message object of the handler invocation
return !s.equals(msg) ? null : msgExpression;
}
public ValueExpression setVariable(String s,
ValueExpression valueExpression) {
// not necessary - the mapper resolves only "msg" and nothing else
return null;
}
}
/**
* This function mapper does nothing, i.e. custom EL functions are not
* supported by default. It may be supported in the future to pass in
* custom function mappers at bus instanciation time.
*/
private class NoopFunctionMapper extends FunctionMapper {
public Method resolveFunction(String s, String s1) {
return null;
}
}
}

View File

@ -1,27 +0,0 @@
package net.engio.mbassy.listener;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Configure a handler to receive an enveloped message as a wrapper around the source
* message. An enveloped message can contain any type of message
*
* @author bennidi
* Date: 2/8/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD, ElementType.ANNOTATION_TYPE})
public @interface Enveloped {
/**
* The set of messages that should be dispatched to the message handler
*/
Class[] messages();
}

View File

@ -1,30 +0,0 @@
package net.engio.mbassy.listener;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The filter annotation is used to add filters to message listeners.
* It references a class that implements the IMessageFilter interface.
* The filter will be used to check whether a message should be delivered
* to the listener or not.
* <p/>
* <p/>
*
* @author bennidi
* Date: 2/14/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.ANNOTATION_TYPE})
public @interface Filter {
/**
* The class that implements the filter.
* Note: A filter always needs to provide a non-arg constructor
*
* @return
*/
Class<? extends IMessageFilter> value();
}

View File

@ -1,53 +0,0 @@
package net.engio.mbassy.listener;
/**
* A set of standard filters for common use cases.
*
* @author bennidi
* Date: 12/12/12
*/
public class Filters {
/**
* This filter will only accept messages of the exact same type
* as specified for the handler. Subclasses (this includes interface implementations)
* will be rejected.
*
* NOTE: The same functionality (with better performance) is achieved using {@code rejectSubtypes = true}
* in the @Handler annotation
*/
public static final class RejectSubtypes implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandler metadata) {
for (Class handledMessage : metadata.getHandledMessages()) {
if (handledMessage.equals(event.getClass())) {
return true;
}
}
return false;
}
}
/**
* This filter will only accept messages that are real subtypes
* of the specified message types handled by the message handler.
* Example: If the handler handles Object.class the filter accepts
* all objects except any direct instance of Object.class {@code new Object()}
*/
public static final class SubtypesOnly implements IMessageFilter{
@Override
public boolean accepts(Object message, MessageHandler metadata) {
for(Class acceptedClasses : metadata.getHandledMessages()){
if(acceptedClasses.isAssignableFrom(message.getClass())
&& ! acceptedClasses.equals(message.getClass()))
return true;
}
return false;
}
}
}

View File

@ -1,9 +1,10 @@
package net.engio.mbassy.listener;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import java.lang.annotation.*;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Mark any method of any class(=listener) as a message handler and configure the handler
@ -17,35 +18,6 @@ import java.lang.annotation.*;
@Target(value = {ElementType.METHOD,ElementType.ANNOTATION_TYPE})
public @interface Handler {
/**
* Add any numbers of filters to the handler. All filters are evaluated before the handler
* is actually invoked, which is only if all the filters accept the message.
*/
Filter[] filters() default {};
/**
* Defines a filter condition as Expression Language. This can be used to filter the events based on
* attributes of the event object. Note that the expression must resolve to either
* <code>true</code> to allow the event or <code>false</code> to block it from delivery to the handler.
* The message itself is available as "msg" variable.
* @return the condition in EL syntax.
*/
String condition() default "";
/**
* Define the mode in which a message is delivered to each listener. Listeners can be notified
* sequentially or concurrently.
*/
Invoke delivery() default Invoke.Synchronously;
/**
* Handlers are ordered by priority and handlers with higher priority are processed before
* those with lower priority, i.e. Influence the order in which different handlers that consume
* the same message type are invoked.
*/
int priority() default 0;
/**
* Define whether or not the handler accepts sub types of the message type it declares in its
* signature.
@ -59,18 +31,4 @@ public @interface Handler {
* handlers that have been declared by a superclass but do not apply to the subclass
*/
boolean enabled() default true;
/**
* Each handler call is implemented as an invocation object that implements the invocation mechanism.
* The basic implementation uses reflection and is the default. It is possible though to provide a custom
* invocation to add additional logic.
*
* Note: Providing a custom invocation will most likely reduce performance, since the JIT-Compiler
* can not do some of its sophisticated byte code optimizations.
*
*/
Class<? extends HandlerInvocation> invocation() default ReflectiveHandlerInvocation.class;
}

View File

@ -1,40 +0,0 @@
package net.engio.mbassy.listener;
/**
* Message filters can be used to control what messages are delivered to a specific message handler.
* Filters are attached to message handler using the @Listener annotation.
* If a message handler specifies filters, the filters accepts(...) method will be checked before the actual handler is invoked.
* The handler will be invoked only if each filter accepted the message.
*
* Example:
*
* @Lister
* @Filters(Urlfilter.class)
* public void someHandler(String message){...}
*
* class Urlfilter implements IMessageFilter<String>{
* public boolean accepts(String message, MessageHandler metadata){
* return message.startsWith("http");
* }
* }
*
* bus.post("http://www.infoq.com"); // will be delivered
* bus.post("www.stackoverflow.com"); // will not be delivered
*
* NOTE: A message filter must provide a no-arg constructor!!!
*
* @author bennidi
* Date: 2/8/12
*/
public interface IMessageFilter<M> {
/**
* Check the message for whatever criteria
*
* @param message the message to be handled by the handler
* @param metadata the metadata object which describes the message handler
* @return true: if the message matches the criteria and should be delivered to the handler
* false: otherwise
*/
boolean accepts(M message, MessageHandler metadata);
}

View File

@ -1,11 +0,0 @@
package net.engio.mbassy.listener;
/**
* Created with IntelliJ IDEA.
*
* @author bennidi
* Date: 11/16/12
*/
public enum Invoke {
Synchronously, Asynchronously
}

View File

@ -1,6 +1,10 @@
package net.engio.mbassy.listener;
import java.lang.annotation.*;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*
@ -17,12 +21,4 @@ import java.lang.annotation.*;
@Inherited
public @interface Listener {
/**
* By default, references to message listeners are weak to eliminate risks of memory leaks.
* It is possible to use strong references instead.
*
* @return
*/
References references() default References.Weak;
}

View File

@ -1,13 +1,9 @@
package net.engio.mbassy.listener;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.el.ElFilter;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import net.engio.mbassy.common.ReflectionUtils;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
@ -19,198 +15,57 @@ import java.util.Map;
*/
public class MessageHandler {
public static final class Properties{
public static final String HandlerMethod = "handler";
public static final String InvocationMode = "invocationMode";
public static final String Filter = "filter";
public static final String Condition = "condition";
public static final String Enveloped = "envelope";
public static final String HandledMessages = "messages";
public static final String IsSynchronized = "synchronized";
public static final String Listener = "listener";
public static final String AcceptSubtypes = "subtypes";
public static final String Priority = "priority";
public static final String Invocation = "invocation";
/**
* Create the property map for the {@link MessageHandler} constructor using the default objects.
*
* @param handler The handler annotated method of the listener
* @param handlerConfig The annotation that configures the handler
* @param filter The set of preconfigured filters if any
* @param listenerConfig The listener metadata
* @return A map of properties initialized from the given parameters that will conform to the requirements of the
* {@link MessageHandler} constructor. See {@see MessageHandler.validate()} for more details.
*/
public static final Map<String, Object> Create(Method handler, Handler handlerConfig, IMessageFilter[] filter, MessageListener listenerConfig){
if(handler == null){
throw new IllegalArgumentException("The message handler configuration may not be null");
}
if(filter == null){
filter = new IMessageFilter[]{};
}
Enveloped enveloped = ReflectionUtils.getAnnotation( handler, Enveloped.class );
Class[] handledMessages = enveloped != null
? enveloped.messages()
: handler.getParameterTypes();
handler.setAccessible(true);
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(HandlerMethod, handler);
// add EL filter if a condition is present
if(handlerConfig.condition().length() > 0){
if (!ElFilter.isELAvailable()) {
throw new IllegalStateException("A handler uses an EL filter but no EL implementation is available.");
}
IMessageFilter[] expandedFilter = new IMessageFilter[filter.length + 1];
for(int i = 0; i < filter.length ; i++){
expandedFilter[i] = filter[i];
}
expandedFilter[filter.length] = new ElFilter();
filter = expandedFilter;
}
properties.put(Filter, filter);
properties.put(Condition, cleanEL(handlerConfig.condition()));
properties.put(Priority, handlerConfig.priority());
properties.put(Invocation, handlerConfig.invocation());
properties.put(InvocationMode, handlerConfig.delivery());
properties.put(Enveloped, enveloped != null);
properties.put(AcceptSubtypes, !handlerConfig.rejectSubtypes());
properties.put(Listener, listenerConfig);
properties.put(IsSynchronized, ReflectionUtils.getAnnotation( handler, Synchronized.class) != null);
properties.put(HandledMessages, handledMessages);
return properties;
}
private static String cleanEL(String expression) {
if (!expression.trim().startsWith("${") && !expression.trim().startsWith("#{")) {
expression = "${"+expression+"}";
}
return expression;
}
}
private final Method handler;
private final IMessageFilter[] filter;
private final String condition;
private final int priority;
private final Class<? extends HandlerInvocation> invocation;
private final Invoke invocationMode;
private final boolean isEnvelope;
private final Class[] handledMessages;
private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes;
private final MessageListener listenerConfig;
private final boolean isSynchronized;
public MessageHandler(Map<String, Object> properties){
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){
super();
validate(properties);
this.handler = (Method)properties.get(Properties.HandlerMethod);
this.filter = (IMessageFilter[])properties.get(Properties.Filter);
this.condition = (String)properties.get(Properties.Condition);
this.priority = (Integer)properties.get(Properties.Priority);
this.invocation = (Class<? extends HandlerInvocation>)properties.get(Properties.Invocation);
this.invocationMode = (Invoke)properties.get(Properties.InvocationMode);
this.isEnvelope = (Boolean)properties.get(Properties.Enveloped);
this.acceptsSubtypes = (Boolean)properties.get(Properties.AcceptSubtypes);
this.listenerConfig = (MessageListener)properties.get(Properties.Listener);
this.isSynchronized = (Boolean)properties.get(Properties.IsSynchronized);
this.handledMessages = (Class[])properties.get(Properties.HandledMessages);
}
private void validate(Map<String, Object> properties){
Object[][] expectedProperties = new Object[][]{
new Object[]{Properties.HandlerMethod, Method.class },
new Object[]{Properties.Priority, Integer.class },
new Object[]{Properties.Invocation, Class.class },
new Object[]{Properties.Filter, IMessageFilter[].class },
new Object[]{Properties.Condition, String.class },
new Object[]{Properties.Enveloped, Boolean.class },
new Object[]{Properties.HandledMessages, Class[].class },
new Object[]{Properties.IsSynchronized, Boolean.class },
new Object[]{Properties.Listener, MessageListener.class },
new Object[]{Properties.AcceptSubtypes, Boolean.class }
};
for(Object[] property : expectedProperties){
if (properties.get(property[0]) == null || !((Class)property[1]).isAssignableFrom(properties.get(property[0]).getClass()))
throw new IllegalArgumentException("Property " + property[0] + " was expected to be not null and of type " + property[1]
+ " but was: " + properties.get(property[0]));
if (handler == null) {
throw new IllegalArgumentException("The message handler configuration may not be null");
}
Class<?>[] handledMessages = handler.getParameterTypes();
handler.setAccessible(true);
this.handler = handler;
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
this.listenerConfig = listenerMetadata;
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
}
public <A extends Annotation> A getAnnotation(Class<A> annotationType){
return ReflectionUtils.getAnnotation(handler,annotationType);
return ReflectionUtils.getAnnotation(this.handler,annotationType);
}
public boolean isSynchronized(){
return isSynchronized;
return this.isSynchronized;
}
public boolean useStrongReferences(){
return listenerConfig.useStrongReferences();
}
public boolean isFromListener(Class listener){
return listenerConfig.isFromListener(listener);
}
public boolean isAsynchronous() {
return invocationMode.equals(Invoke.Asynchronously);
}
public boolean isFiltered() {
return filter.length > 0 || (condition != null && condition.trim().length() > 0);
}
public int getPriority() {
return priority;
public boolean isFromListener(Class<?> listener){
return this.listenerConfig.isFromListener(listener);
}
public Method getHandler() {
return handler;
return this.handler;
}
public IMessageFilter[] getFilter() {
return filter;
}
public String getCondition() {
return this.condition;
}
public Class[] getHandledMessages() {
return handledMessages;
}
public boolean isEnveloped() {
return isEnvelope;
}
public Class<? extends HandlerInvocation> getHandlerInvocation(){
return invocation;
public Class<?>[] getHandledMessages() {
return this.handledMessages;
}
public boolean handlesMessage(Class<?> messageType) {
for (Class<?> handledMessage : handledMessages) {
for (Class<?> handledMessage : this.handledMessages) {
if (handledMessage.equals(messageType)) {
return true;
}
if (handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) {
return true;
}
@ -219,7 +74,6 @@ public class MessageHandler {
}
public boolean acceptsSubtypes() {
return acceptsSubtypes;
return this.acceptsSubtypes;
}
}

View File

@ -1,8 +1,5 @@
package net.engio.mbassy.listener;
import net.engio.mbassy.common.IPredicate;
import net.engio.mbassy.common.ReflectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@ -23,55 +20,36 @@ import java.util.List;
* @author bennidi
* Date: 12/16/12
*/
public class MessageListener<T> {
public static IPredicate<MessageHandler> ForMessage(final Class<?> messageType) {
return new IPredicate<MessageHandler>() {
@Override
public boolean apply(MessageHandler target) {
return target.handlesMessage(messageType);
}
};
}
public class MessageListener {
private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
private Class<?> listenerDefinition;
private Class<T> listenerDefinition;
private Listener listenerAnnotation;
public MessageListener(Class<T> listenerDefinition) {
public MessageListener(Class<?> listenerDefinition) {
this.listenerDefinition = listenerDefinition;
listenerAnnotation = ReflectionUtils.getAnnotation( listenerDefinition, Listener.class );
}
public boolean isFromListener(Class listener){
return listenerDefinition.equals(listener);
}
public boolean useStrongReferences(){
return listenerAnnotation != null && listenerAnnotation.references().equals(References.Strong);
public boolean isFromListener(Class<?> listener){
return this.listenerDefinition.equals(listener);
}
public MessageListener addHandlers(Collection<? extends MessageHandler> c) {
handlers.addAll(c);
this.handlers.addAll(c);
return this;
}
public boolean addHandler(MessageHandler messageHandler) {
return handlers.add(messageHandler);
return this.handlers.add(messageHandler);
}
public List<MessageHandler> getHandlers(){
return handlers;
return this.handlers;
}
public List<MessageHandler> getHandlers(IPredicate<MessageHandler> filter) {
public List<MessageHandler> getHandlers(Class<?> messageType) {
List<MessageHandler> matching = new LinkedList<MessageHandler>();
for (MessageHandler handler : handlers) {
if (filter.apply(handler)) {
for (MessageHandler handler : this.handlers) {
if (handler.handlesMessage(messageType)) {
matching.add(handler);
}
}
@ -79,10 +57,10 @@ public class MessageListener<T> {
}
public boolean handles(Class<?> messageType) {
return !getHandlers(ForMessage(messageType)).isEmpty();
return !getHandlers(messageType).isEmpty();
}
public Class<T> getListerDefinition() {
return listenerDefinition;
public Class<?> getListerDefinition() {
return this.listenerDefinition;
}
}

View File

@ -1,14 +1,10 @@
package net.engio.mbassy.listener;
import net.engio.mbassy.common.IPredicate;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.subscription.MessageEnvelope;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import net.engio.mbassy.common.ReflectionUtils;
/**
* The meta data reader is responsible for parsing and validating message handler configurations.
@ -18,46 +14,13 @@ import java.util.Map;
*/
public class MetadataReader {
// This predicate is used to find all message listeners (methods annotated with @Handler)
private static final IPredicate<Method> AllMessageHandlers = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return ReflectionUtils.getAnnotation(target, Handler.class) != null;
}
};
// cache already created filter instances
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
// retrieve all instances of filters associated with the given subscription
private IMessageFilter[] getFilter(Handler subscription) {
if (subscription.filters().length == 0) {
return null;
}
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
int i = 0;
for (Filter filterDef : subscription.filters()) {
IMessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
try {
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
} catch (Exception e) {
throw new RuntimeException(e);// propagate as runtime exception
}
}
filters[i] = filter;
i++;
}
return filters;
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public MessageListener getMessageListener(Class target) {
MessageListener listenerMetadata = new MessageListener(target);
public MessageListener getMessageListener(Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
List<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
List<Method> bottomMostHandlers = new LinkedList<Method>();
for (Method handler : allHandlers) {
@ -66,6 +29,8 @@ public class MetadataReader {
}
}
MessageListener listenerMetadata = new MessageListener(target);
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
for (Method handler : bottomMostHandlers) {
@ -73,11 +38,14 @@ public class MetadataReader {
if (!handlerConfig.enabled() || !isValidMessageHandler(handler)) {
continue; // disabled or invalid listeners are ignored
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
if (overriddenHandler == null) {
overriddenHandler = handler;
}
// if a handler is overwritten it inherits the configuration of its parent method
Map<String, Object> handlerProperties = MessageHandler.Properties.Create(overriddenHandler == null ? handler : overriddenHandler,
handlerConfig, getFilter(handlerConfig), listenerMetadata);
MessageHandler handlerMetadata = new MessageHandler(handlerProperties);
MessageHandler handlerMetadata = new MessageHandler(overriddenHandler, handlerConfig, listenerMetadata);
listenerMetadata.addHandler(handlerMetadata);
}
@ -86,7 +54,7 @@ public class MetadataReader {
//TODO: change this to support MORE THAN ONE object in the signature
private boolean isValidMessageHandler(Method handler) {
if (handler == null || ReflectionUtils.getAnnotation( handler, Handler.class) == null) {
return false;
@ -97,15 +65,7 @@ public class MetadataReader {
+ "]. A messageHandler must define exactly one parameter");
return false;
}
Enveloped envelope = ReflectionUtils.getAnnotation( handler, Enveloped.class);
if (envelope != null && !MessageEnvelope.class.isAssignableFrom(handler.getParameterTypes()[0])) {
System.out.println("Message envelope configured but no subclass of MessageEnvelope found as parameter");
return false;
}
if (envelope != null && envelope.messages().length == 0) {
System.out.println("Message envelope configured but message types defined for handler");
return false;
}
return true;
}

View File

@ -1,10 +0,0 @@
package net.engio.mbassy.listener;
/**
*
* @author bennidi
* Date: 3/29/13
*/
public enum References {
Strong,Weak
}

View File

@ -1,22 +0,0 @@
package net.engio.mbassy.subscription;
/**
* A message envelope is used to wrap messages of arbitrary type such that a handler
* my receive messages of different types.
*
* @author bennidi
* Date: 12/12/12
*/
public class MessageEnvelope {
// Internal state
private Object message;
public MessageEnvelope(Object message) {
this.message = message;
}
public <T> T getMessage() {
return (T) message;
}
}

View File

@ -4,9 +4,6 @@ import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import java.util.Comparator;
import java.util.UUID;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (exlcuding subclasses) of a SingleMessageHandler.class
@ -21,8 +18,6 @@ import java.util.UUID;
*/
public class Subscription {
private final UUID id = UUID.randomUUID();
protected final IConcurrentSet<Object> listeners;
private final IMessageDispatcher dispatcher;
@ -41,8 +36,8 @@ public class Subscription {
* @param listener
* @return
*/
public boolean belongsTo(Class listener){
return context.getHandlerMetadata().isFromListener(listener);
public boolean belongsTo(Class<?> listener){
return this.context.getHandlerMetadata().isFromListener(listener);
}
/**
@ -51,7 +46,7 @@ public class Subscription {
* @return
*/
public boolean contains(Object listener){
return listeners.contains(listener);
return this.listeners.contains(listener);
}
/**
@ -60,46 +55,30 @@ public class Subscription {
* @return
*/
public boolean handlesMessageType(Class<?> messageType) {
return context.getHandlerMetadata().handlesMessage(messageType);
return this.context.getHandlerMetadata().handlesMessage(messageType);
}
public Class[] getHandledMessageTypes(){
return context.getHandlerMetadata().getHandledMessages();
public Class<?>[] getHandledMessageTypes(){
return this.context.getHandlerMetadata().getHandledMessages();
}
public void publish(IMessagePublication publication, Object message){
if(listeners.size() > 0)
dispatcher.dispatch(publication, message, listeners);
if(this.listeners.size() > 0) {
this.dispatcher.dispatch(publication, message, this.listeners);
}
}
public int getPriority() {
return context.getHandlerMetadata().getPriority();
}
public void subscribe(Object o) {
listeners.add(o);
this.listeners.add(o);
}
public boolean unsubscribe(Object existingListener) {
return listeners.remove(existingListener);
return this.listeners.remove(existingListener);
}
public int size() {
return listeners.size();
return this.listeners.size();
}
public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
@Override
public int compare(Subscription o1, Subscription o2) {
int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());
return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority;
}
};
}

View File

@ -1,17 +1,21 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.listener.MessageHandler;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.Collection;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.MessageDispatcher;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.listener.MessageHandler;
/**
* The subscription factory is used to create an empty subscription for specific message handler.
* The message handler's configuration is evaluated and a corresponding subscription is built.
@ -24,9 +28,7 @@ public class SubscriptionFactory {
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher, handlerMetadata.useStrongReferences()
? new StrongConcurrentSet<Object>()
: new WeakConcurrentSet<Object>());
return new Subscription(context, dispatcher, new WeakConcurrentSet<Object>());
} catch (Exception e) {
throw new MessageBusException(e);
}
@ -37,25 +39,18 @@ public class SubscriptionFactory {
if(context.getHandlerMetadata().isSynchronized()){
invocation = new SynchronizedHandlerInvocation(invocation);
}
if (context.getHandlerMetadata().isAsynchronous()) {
invocation = new AsynchronousHandlerInvocation(invocation);
}
return invocation;
}
protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
if (context.getHandlerMetadata().isEnveloped()) {
dispatcher = new EnvelopedMessageDispatcher(dispatcher);
}
if (context.getHandlerMetadata().isFiltered()) {
dispatcher = new FilteredMessageDispatcher(dispatcher);
}
return dispatcher;
}
protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws MessageBusException {
Class<? extends HandlerInvocation> invocation = context.getHandlerMetadata().getHandlerInvocation();
Class<? extends HandlerInvocation> invocation = ReflectiveHandlerInvocation.class;
if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){
throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class");
}

View File

@ -1,14 +1,19 @@
package net.engio.mbassy.subscription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.listener.MessageHandler;
import net.engio.mbassy.listener.MetadataReader;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
@ -20,24 +25,26 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class SubscriptionManager {
private static final Object setObject = new Object();
// the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader;
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage
= new HashMap<Class, Collection<Subscription>>(50);
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessage
= new HashMap<Class<?>, Collection<Subscription>>(50);
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class, Collection<Subscription>> subscriptionsPerListener
= new HashMap<Class, Collection<Subscription>>(50);
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener
= new HashMap<Class<?>, Collection<Subscription>>(50);
// remember already processed classes that do not contain any message handlers
private final StrongConcurrentSet<Class> nonListeners = new StrongConcurrentSet<Class>();
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
@ -74,33 +81,38 @@ public class SubscriptionManager {
private Collection<Subscription> getSubscriptionsByListener(Object listener) {
Collection<Subscription> subscriptions;
try {
readWriteLock.readLock().lock();
subscriptions = subscriptionsPerListener.get(listener.getClass());
this.readWriteLock.readLock().lock();
subscriptions = this.subscriptionsPerListener.get(listener.getClass());
} finally {
readWriteLock.readLock().unlock();
this.readWriteLock.readLock().unlock();
}
return subscriptions;
}
public void subscribe(Object listener) {
try {
if (isKnownNonListener(listener)) {
Class<? extends Object> listenerClass = listener.getClass();
if (this.nonListeners.contains(listenerClass)) {
return; // early reject of known classes that do not define message handlers
}
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
// a listener is either subscribed for the first time
if (subscriptionsByListener == null) {
List<MessageHandler> messageHandlers = metadataReader.getMessageListener(listener.getClass()).getHandlers();
List<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
nonListeners.add(listener.getClass());
this.nonListeners.put(listenerClass, this.nonListeners);
return;
}
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
// create subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
subscriptionsByListener.add(subscriptionFactory.createSubscription(runtime, messageHandler));
subscriptionsByListener.add(this.subscriptionFactory.createSubscription(this.runtime, messageHandler));
}
// this will acquire a write lock and handle the case when another thread already subscribed
// this particular listener in the mean-time
subscribe(listener, subscriptionsByListener);
@ -119,7 +131,7 @@ public class SubscriptionManager {
private void subscribe(Object listener, Collection<Subscription> subscriptions) {
try {
readWriteLock.writeLock().lock();
this.readWriteLock.writeLock().lock();
// basically this is a deferred double check
// it's an ugly pattern but necessary because atomic upgrade from read to write lock
// is not possible
@ -134,7 +146,7 @@ public class SubscriptionManager {
addMessageTypeSubscription(messageType, subscription);
}
}
subscriptionsPerListener.put(listener.getClass(), subscriptions);
this.subscriptionsPerListener.put(listener.getClass(), subscriptions);
}
// the rare case when multiple threads concurrently subscribed the same class for the first time
// one will be first, all others will have to subscribe to the existing instead the generated subscriptions
@ -144,29 +156,29 @@ public class SubscriptionManager {
}
}
} finally {
readWriteLock.writeLock().unlock();
this.readWriteLock.writeLock().unlock();
}
}
private boolean isKnownNonListener(Object listener) {
Class listeningClass = listener.getClass();
return nonListeners.contains(listeningClass);
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
try{
readWriteLock.readLock().lock();
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
// thread safe publication
Collection<Subscription> subscriptions = new LinkedList<Subscription>();
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
try{
this.readWriteLock.readLock().lock();
Collection<Subscription> collection = this.subscriptionsPerMessage.get(messageType);
if (collection != null) {
subscriptions.addAll(collection);
}
for (Class eventSuperType : ReflectionUtils.getSuperTypes(messageType)) {
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
// also add all subscriptions that match super types
for (Class<?> eventSuperType : ReflectionUtils.getSuperTypes(messageType)) {
Collection<Subscription> subs = this.subscriptionsPerMessage.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
@ -176,19 +188,20 @@ public class SubscriptionManager {
}
}
}finally{
readWriteLock.readLock().unlock();
this.readWriteLock.readLock().unlock();
}
return subscriptions;
}
// associate a subscription with a message type
// NOTE: Not thread-safe! must be synchronized in outer scope
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
private void addMessageTypeSubscription(Class<?> messageType, Subscription subscription) {
Collection<Subscription> subscriptions = this.subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
this.subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
}

View File

@ -11,19 +11,16 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
StrongConcurrentSetTest.class,
WeakConcurrentSetTest.class,
MBassadorTest.class,
SyncBusTest.MBassadorTest.class,
SyncBusTest.SyncMessageBusTest.class,
FilterTest.class,
MetadataReaderTest.class,
MethodDispatchTest.class,
DeadMessageTest.class,
SynchronizedHandlerTest.class,
SubscriptionManagerTest.class,
AsyncFIFOBusTest.class,
ConditionalHandlers.class
})
public class AllTests {
}

View File

@ -1,14 +1,14 @@
package net.engio.mbassy;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
import org.junit.Test;
/**
*
@ -35,11 +35,13 @@ public class AsyncFIFOBusTest extends MessageBusTest {
messages[i] = i;
}
// publish in ascending order
for(Integer message : messages)
for(Integer message : messages) {
fifoBUs.post(message).asynchronously();
}
while(fifoBUs.hasPendingMessages())
while(fifoBUs.hasPendingMessages()) {
pause(1000);
}
for(SyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
@ -69,19 +71,18 @@ public class AsyncFIFOBusTest extends MessageBusTest {
messages[i] = i;
}
// publish in ascending order
for(Integer message : messages)
for(Integer message : messages) {
fifoBUs.post(message).asynchronously();
}
while(fifoBUs.hasPendingMessages())
while(fifoBUs.hasPendingMessages()) {
pause(2000);
}
for(SyncAsyncListener listener : listeners){
assertEquals(messages.length, listener.receivedSync.size());
assertEquals(listener.receivedSync.size(), listener.receivedAsync.size());
for(int i=0; i < listener.receivedAsync.size(); i++){
for(int i=0; i < messages.length; i++){
assertEquals(messages[i], listener.receivedSync.get(i));
// sync and async in same order
assertEquals(listener.receivedSync.get(i), listener.receivedAsync.get(i));
}
}
@ -137,7 +138,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Handler
public void handleSync(Integer message){
receivedSync.add(message);
this.receivedSync.add(message);
}
}
@ -145,18 +146,11 @@ public class AsyncFIFOBusTest extends MessageBusTest {
public static class SyncAsyncListener {
private List<Integer> receivedSync = new LinkedList<Integer>();
private List<Integer> receivedAsync = new LinkedList<Integer>();
@Handler
public void handleSync(Integer message){
receivedSync.add(message);
this.receivedSync.add(message);
}
@Handler(delivery = Invoke.Asynchronously)
public void handleASync(Integer message){
receivedAsync.add(message);
}
}
}

View File

@ -1,167 +0,0 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.subscription.MessageEnvelope;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
/*****************************************************************************
* Some unit tests for the "condition" filter.
****************************************************************************/
public class ConditionalHandlers extends MessageBusTest {
public static class TestEvent {
private Set<String> handledBy = new HashSet<String>();
private String type;
private int size;
public TestEvent(String type, int size) {
super();
this.type = type;
this.size = size;
}
public String getType() {
return type;
}
public int getSize() {
return size;
}
public boolean wasHandledBy(String ...handlers){
for(String handler : handlers){
if (!handledBy.contains(handler)) return false;
}
return true;
}
public void handledBy(String handler){
handledBy.add(handler);
}
}
@Listener(references = References.Strong)
public static class ConditionalMessageListener {
@Handler(condition = "msg.type == 'TEST'")
public void handleTypeMessage(TestEvent message) {
message.handledBy("handleTypeMessage");
}
@Handler(condition = "msg.size > 4")
public void handleSizeMessage(TestEvent message) {
message.handledBy("handleSizeMessage");
}
@Handler(condition = "msg.foo > 4")
public void handleInvalidEL(TestEvent message) {
message.handledBy("handleInvalidEL");
}
@Handler(condition = "msg.size > 2 && msg.size < 4")
public void handleCombinedEL(TestEvent message) {
message.handledBy( "handleCombinedEL");
}
@Handler(condition = "msg.getType().equals('XYZ') && msg.getSize() == 1")
public void handleMethodAccessEL(TestEvent message) {
message.handledBy("handleMethodAccessEL");
}
@Handler(condition = "msg.type == 'TEST'")
@Enveloped(messages = {TestEvent.class, Object.class})
public void handleEnvelopedMessage(MessageEnvelope envelope) {
envelope.<TestEvent>getMessage().handledBy("handleEnvelopedMessage");
}
}
/*************************************************************************
* @throws Exception
************************************************************************/
@Test
public void testSimpleStringCondition() throws Exception {
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("TEST", 0);
bus.publish(message);
assertTrue(message.wasHandledBy("handleTypeMessage", "handleEnvelopedMessage"));
assertFalse(message.wasHandledBy("handleInvalidEL"));
}
/*************************************************************************
* @throws Exception
************************************************************************/
@Test
public void testSimpleNumberCondition() throws Exception {
MBassador bus = new MBassador();
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 5);
bus.publish(message);
assertTrue(message.wasHandledBy("handleSizeMessage"));
assertFalse(message.wasHandledBy("handleInvalidEL"));
}
/*************************************************************************
* @throws Exception
************************************************************************/
@Test
public void testHandleCombinedEL() throws Exception {
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 3);
bus.publish(message);
assertTrue(message.wasHandledBy("handleCombinedEL"));
assertFalse(message.wasHandledBy("handleInvalidEL"));
}
/*************************************************************************
* @throws Exception
************************************************************************/
@Test
public void testNotMatchingAnyCondition() throws Exception {
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 0);
bus.publish(message);
assertTrue(message.handledBy.isEmpty());
}
/*************************************************************************
* @throws Exception
************************************************************************/
@Test
public void testHandleMethodAccessEL() throws Exception {
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("XYZ", 1);
bus.publish(message);
assertTrue(message.wasHandledBy("handleMethodAccessEL"));
assertFalse(message.wasHandledBy("handleInvalidEL"));
}
}

View File

@ -1,37 +1,41 @@
package net.engio.mbassy;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashSet;
import java.util.Set;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.subscription.MessageEnvelope;
import org.junit.Test;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Synchronized;
import java.lang.annotation.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
/**
* Tests a custom handler annotation with a @Handler meta annotation and a default filter.
*/
public class CustomHandlerAnnotationTest extends MessageBusTest
{
/**
* Handler annotation that adds a default filter on the NamedMessage.
* Enveloped is in no way required, but simply added to test a meta enveloped annotation.
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Handler(filters = { @Filter(NamedMessageFilter.class) })
@Synchronized
@Target(value = { ElementType.METHOD, ElementType.ANNOTATION_TYPE })
static @interface NamedMessageHandler
{
/**
* @return The message names supported.
*/
String[] value();
}
/**
* Handler annotation that adds a default filter on the NamedMessage.
* Enveloped is in no way required, but simply added to test a meta enveloped annotation.
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Handler()
@Synchronized
@Target(value = { ElementType.METHOD, ElementType.ANNOTATION_TYPE })
static @interface NamedMessageHandler
{
/**
* @return The message names supported.
*/
String[] value();
}
/**
* Handler annotation that adds a default filter on the NamedMessage.
@ -44,100 +48,75 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
/**
* Test enveloped meta annotation.
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = { ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Inherited
@Handler(filters = { @Filter(NamedMessageFilter.class) })
@Enveloped(messages = NamedMessage.class)
static @interface EnvelopedNamedMessageHandler
{
/**
* @return The message names supported.
*/
String[] value();
}
/**
* Test enveloped meta annotation.
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = { ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Inherited
@Handler()
static @interface EnvelopedNamedMessageHandler
{
/**
* @return The message names supported.
*/
String[] value();
}
/**
* Searches for a NamedMessageHandler annotation on the handler method.
* The annotation specifies the supported message names.
*/
public static class NamedMessageFilter implements IMessageFilter<NamedMessage>
{
@Override
public boolean accepts( NamedMessage message, MessageHandler metadata ) {
NamedMessageHandler namedMessageHandler = metadata.getAnnotation(NamedMessageHandler.class);
if ( namedMessageHandler != null ) {
return Arrays.asList( namedMessageHandler.value() ).contains( message.getName() );
}
static class NamedMessage
{
private String name;
EnvelopedNamedMessageHandler envelopedHandler = metadata.getAnnotation(EnvelopedNamedMessageHandler.class);
return envelopedHandler != null && Arrays.asList( envelopedHandler.value() ).contains( message.getName() );
NamedMessage( String name ) {
this.name = name;
}
}
}
public String getName() {
return this.name;
}
}
static class NamedMessage
{
private String name;
static class NamedMessageListener
{
final Set<NamedMessage> handledByOne = new HashSet<NamedMessage>();
final Set<NamedMessage> handledByTwo = new HashSet<NamedMessage>();
final Set<NamedMessage> handledByThree = new HashSet<NamedMessage>();
NamedMessage( String name ) {
this.name = name;
}
@NamedMessageHandler({ "messageOne", "messageTwo" })
void handlerOne( NamedMessage message ) {
this.handledByOne.add( message );
}
public String getName() {
return name;
}
}
@MessageThree
void handlerThree( NamedMessage message ) {
this.handledByThree.add( message );
}
}
static class NamedMessageListener
{
final Set<NamedMessage> handledByOne = new HashSet<NamedMessage>();
final Set<NamedMessage> handledByTwo = new HashSet<NamedMessage>();
final Set<NamedMessage> handledByThree = new HashSet<NamedMessage>();
@Test
public void testMetaHandlerFiltering() {
MBassador bus = createBus(SyncAsync());
@NamedMessageHandler({ "messageOne", "messageTwo" })
void handlerOne( NamedMessage message ) {
handledByOne.add( message );
}
NamedMessageListener listener = new NamedMessageListener();
bus.subscribe( listener );
@EnvelopedNamedMessageHandler({ "messageTwo", "messageThree" })
void handlerTwo( MessageEnvelope envelope ) {
handledByTwo.add( (NamedMessage) envelope.getMessage() );
}
NamedMessage messageOne = new NamedMessage( "messageOne" );
NamedMessage messageTwo = new NamedMessage( "messageTwo" );
NamedMessage messageThree = new NamedMessage( "messageThree" );
@MessageThree
void handlerThree( NamedMessage message ) {
handledByThree.add( message );
}
}
@Test
public void testMetaHandlerFiltering() {
MBassador bus = createBus(SyncAsync());
NamedMessageListener listener = new NamedMessageListener();
bus.subscribe( listener );
NamedMessage messageOne = new NamedMessage( "messageOne" );
NamedMessage messageTwo = new NamedMessage( "messageTwo" );
NamedMessage messageThree = new NamedMessage( "messageThree" );
bus.publish( messageOne );
bus.publish( messageTwo );
bus.publish( messageThree );
bus.publish( messageOne );
bus.publish( messageTwo );
bus.publish( messageThree );
assertEquals(2, listener.handledByOne.size());
assertTrue( listener.handledByOne.contains( messageOne ) );
assertTrue(listener.handledByOne.contains(messageTwo));
assertTrue( listener.handledByOne.contains( messageOne ) );
assertTrue(listener.handledByOne.contains(messageTwo));
assertEquals(2, listener.handledByTwo.size());
assertTrue( listener.handledByTwo.contains( messageTwo ) );
assertTrue( listener.handledByTwo.contains( messageThree ) );
assertTrue( listener.handledByTwo.contains( messageTwo ) );
assertTrue( listener.handledByTwo.contains( messageThree ) );
assertEquals(1, listener.handledByThree.size());
assertTrue( listener.handledByThree.contains( messageThree ) );
}
assertTrue( listener.handledByThree.contains( messageThree ) );
}
}

View File

@ -1,5 +1,7 @@
package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.common.ConcurrentExecutor;
@ -10,11 +12,10 @@ import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.listeners.ObjectListener;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Verify correct behaviour in case of message publications that do not have any matching subscriptions
*
@ -23,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class DeadMessageTest extends MessageBusTest{
@Override
@Before
public void beforeTest(){
DeadMessagHandler.deadMessages.set(0);
@ -34,10 +36,8 @@ public class DeadMessageTest extends MessageBusTest{
final MBassador bus = createBus(SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.AsyncListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, DeadMessagHandler.class)
.create(InstancesPerListener, Object.class);

View File

@ -1,150 +0,0 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Testing of filter functionality
*
* @author bennidi
* Date: 11/26/12
*/
public class FilterTest extends MessageBusTest {
private static final AtomicInteger FilteredEventCounter = new AtomicInteger(0);
private static final AtomicInteger DeadEventCounter = new AtomicInteger(0);
@Test
public void testSubclassFilter() throws Exception {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = createBus(SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.getAll();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage message = new TestMessage();
TestMessage subTestMessage = new SubTestMessage();
bus.post(message).now();
bus.post(subTestMessage).now();
assertEquals(100, message.counter.get());
assertEquals(0, subTestMessage.counter.get());
assertEquals(100, FilteredEventCounter.get());
}
@Test
public void testFilteredFilteredEvent() throws Exception {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = createBus(SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.getAll();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
bus.post(new Object()).now();
bus.post(new SubTestMessage()).now();
assertEquals(100, FilteredEventCounter.get()); // the SubTestMessage should have been republished as a filtered event
assertEquals(100, DeadEventCounter.get()); // Object.class was filtered and the fil
}
public static class FilteredMessageListener{
// NOTE: Use rejectSubtypes property of @Handler to achieve the same functionality but with better performance
// and more concise syntax
@Handler(filters = {@Filter(Filters.RejectSubtypes.class)})
public void handleTestMessage(TestMessage message){
message.counter.incrementAndGet();
}
// FilteredEvents that contain messages of class Object will be filtered (again) and should cause a DeadEvent to be thrown
@Handler(filters = {@Filter(RejectFilteredObjects.class)})
public void handleFilteredEvent(FilteredMessage filtered){
FilteredEventCounter.incrementAndGet();
}
// will cause republication of a FilteredEvent
@Handler(filters = {@Filter(RejectAll.class)})
public void handleNone(Object any){
FilteredEventCounter.incrementAndGet();
}
// will cause republication of a FilteredEvent
@Handler
public void handleDead(DeadMessage dead){
DeadEventCounter.incrementAndGet();
}
}
@Test
public void testSubtypesOnly(){
MBassador bus = createBus(SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(100, TestMessageHandler.class);
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage supertype = new TestMessage();
TestMessage subtype = new SubTestMessage();
bus.publish(supertype);
bus.publish(subtype);
assertEquals(100, subtype.counter.get());
assertEquals(0, supertype.counter.get());
}
public static class TestMessageHandler{
@Handler(filters = @Filter(Filters.SubtypesOnly.class))
public void handle(TestMessage message){
message.counter.incrementAndGet();
}
}
public static class RejectFilteredObjects implements IMessageFilter{
@Override
public boolean accepts(Object message, MessageHandler metadata) {
if(message.getClass().equals(FilteredMessage.class) && ((FilteredMessage)message).getMessage().getClass().equals(Object.class)){
return false;
}
return true;
}
}
public static final class RejectAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandler metadata) {
return false;
}
}
}

View File

@ -1,17 +1,24 @@
package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.MessageManager;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.Listeners;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
@ -57,42 +64,10 @@ public class MBassadorTest extends MessageBusTest {
}
@Test
public void testSyncPublicationAsyncHandlers() throws Exception {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = createBus(SyncAsync(), listeners);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = messageManager.create(StandardMessage.class, InstancesPerListener, Listeners.join(Listeners.asynchronous(), Listeners.handlesStandardMessage()));
MultipartMessage multipartMessage = messageManager.create(MultipartMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class, IMultipartMessageListener.AsyncListener.class);
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
messageManager.waitForMessages(processingTimeInMS);
MessageTypes.resetAll();
messageManager.register(MessageTypes.Simple, InstancesPerListener * ConcurrentUnits, IMessageListener.AsyncListener.class, MessagesListener.AsyncListener.class);
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
messageManager.waitForMessages(processingTimeInMS);
}
@Test
public void testAsynchronousMessagePublication() throws Exception {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = createBus(SyncAsync(), listeners);
@ -102,11 +77,7 @@ public class MBassadorTest extends MessageBusTest {
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = messageManager.create(StandardMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class);
MultipartMessage multipartMessage = messageManager.create(MultipartMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class);
bus.post(standardMessage).asynchronously();
bus.post(multipartMessage).asynchronously();
bus.post(MessageTypes.Simple).asynchronously();
}

View File

@ -1,19 +1,15 @@
package net.engio.mbassy;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.listener.MessageListener;
import org.junit.Test;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.MessageEnvelope;
import java.io.BufferedReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static net.engio.mbassy.listener.MessageListener.ForMessage;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MessageListener;
import net.engio.mbassy.listener.MetadataReader;
import org.junit.Test;
/**
*
@ -26,7 +22,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithoutInheritance() {
MessageListener<MessageListener1> listener = reader.getMessageListener(MessageListener1.class);
MessageListener listener = this.reader.getMessageListener(MessageListener1.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -45,7 +41,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithInheritance() {
MessageListener<MessageListener2> listener = reader.getMessageListener(MessageListener2.class);
MessageListener listener = this.reader.getMessageListener(MessageListener2.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -55,7 +51,7 @@ public class MetadataReaderTest extends AssertSupport {
@Test
public void testListenerWithInheritanceOverriding() {
MessageListener<MessageListener3> listener = reader.getMessageListener(MessageListener3.class);
MessageListener listener = this.reader.getMessageListener(MessageListener3.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(0, String.class)
@ -64,53 +60,26 @@ public class MetadataReaderTest extends AssertSupport {
validator.check(listener);
}
@Test
public void testEnveloped() {
MessageListener<EnvelopedListener> listener = reader.getMessageListener(EnvelopedListener.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(1, String.class)
.expectHandlers(2, Integer.class)
.expectHandlers(2, Long.class)
.expectHandlers(1, Double.class)
.expectHandlers(1, Number.class)
.expectHandlers(0, List.class);
validator.check(listener);
}
@Test
public void testEnvelopedSubclass() {
MessageListener<EnvelopedListenerSubclass> listener = reader.getMessageListener(EnvelopedListenerSubclass.class);
ListenerValidator validator = new ListenerValidator()
.expectHandlers(1, String.class)
.expectHandlers(2, Integer.class)
.expectHandlers(1, Long.class)
.expectHandlers(0, Double.class)
.expectHandlers(0, Number.class);
validator.check(listener);
}
private class ListenerValidator {
private Map<Class<?>, Integer> handlers = new HashMap<Class<?>, Integer>();
public ListenerValidator expectHandlers(Integer count, Class<?> messageType){
handlers.put(messageType, count);
this.handlers.put(messageType, count);
return this;
}
public void check(MessageListener listener){
for(Map.Entry<Class<?>, Integer> expectedHandler: handlers.entrySet()){
for(Map.Entry<Class<?>, Integer> expectedHandler: this.handlers.entrySet()){
if(expectedHandler.getValue() > 0){
assertTrue(listener.handles(expectedHandler.getKey()));
}
else{
assertFalse(listener.handles(expectedHandler.getKey()));
}
assertEquals(expectedHandler.getValue(), listener.getHandlers(ForMessage(expectedHandler.getKey())).size());
assertEquals(expectedHandler.getValue(), listener.getHandlers(expectedHandler.getKey()).size());
}
}
}
@ -141,6 +110,7 @@ public class MetadataReaderTest extends AssertSupport {
public class MessageListener2 extends MessageListener1 {
// redefine handler implementation (not configuration)
@Override
public void handleString(String s) {
}
@ -150,59 +120,17 @@ public class MetadataReaderTest extends AssertSupport {
public class MessageListener3 extends MessageListener2 {
// narrow the handler
@Override
@Handler(rejectSubtypes = true)
public void handleAny(Object o) {
}
@Override
@Handler(enabled = false)
public void handleString(String s) {
}
}
public class EnvelopedListener{
@Handler(rejectSubtypes = true)
@Enveloped(messages = {String.class, Integer.class, Long.class})
public void handleEnveloped(MessageEnvelope o) {
}
@Handler
@Enveloped(messages = Number.class)
public void handleEnveloped2(MessageEnvelope o) {
}
}
public class EnvelopedListenerSubclass extends EnvelopedListener{
// narrow to integer
@Handler
@Enveloped(messages = Integer.class)
public void handleEnveloped2(MessageEnvelope o) {
}
}
public static interface ListenerInterface{
@Handler
@Enveloped(messages = Object.class)
void handle(MessageEnvelope envelope);
}
public class InterfacedListener implements ListenerInterface{
@Override
public void handle(MessageEnvelope envelope) {
//
}
}
}

View File

@ -1,18 +0,0 @@
package net.engio.mbassy;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.common.StrongConcurrentSet;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public class StrongConcurrentSetTest extends ConcurrentSetTest{
@Override
protected IConcurrentSet createSet() {
return new StrongConcurrentSet();
}
}

View File

@ -1,18 +1,33 @@
package net.engio.mbassy;
import java.util.Collections;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.SubscriptionValidator;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.messages.*;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.listeners.AbstractMessageListener;
import net.engio.mbassy.listeners.ICountableListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.IMultipartMessageListener;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.listeners.MultipartMessageListener;
import net.engio.mbassy.listeners.Overloading;
import net.engio.mbassy.listeners.StandardMessageListener;
import net.engio.mbassy.messages.AbstractMessage;
import net.engio.mbassy.messages.ICountable;
import net.engio.mbassy.messages.IMessage;
import net.engio.mbassy.messages.IMultipartMessage;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
import org.junit.Test;
/**
*
@ -32,15 +47,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testIMessageListener(){
ListenerFactory listeners = listeners(
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
IMessageListener.DisabledListener.class,
IMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMessageListener.DefaultListener.class).handles(IMessage.class,
AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.AsyncListener.class).handles(IMessage.class,
AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
runTestWith(listeners, expectedSubscriptions);
@ -50,14 +62,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testAbstractMessageListener(){
ListenerFactory listeners = listeners(
AbstractMessageListener.DefaultListener.class,
AbstractMessageListener.AsyncListener.class,
AbstractMessageListener.DisabledListener.class,
AbstractMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class)
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class)
.listener(AbstractMessageListener.AsyncListener.class).handles(StandardMessage.class, AbstractMessage.class);
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -66,14 +76,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testMessagesListener(){
ListenerFactory listeners = listeners(
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.DisabledListener.class,
MessagesListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class)
.listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class);
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -82,14 +90,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testMultipartMessageListener(){
ListenerFactory listeners = listeners(
MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.AsyncListener.class,
MultipartMessageListener.DisabledListener.class,
MultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class);
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -98,14 +104,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testIMultipartMessageListener(){
ListenerFactory listeners = listeners(
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.DisabledListener.class,
IMultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -114,14 +118,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testStandardMessageListener(){
ListenerFactory listeners = listeners(
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.DisabledListener.class,
StandardMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.AsyncListener.class).handles(StandardMessage.class);
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -130,14 +132,12 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testICountableListener(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
ICountableListener.DisabledListener.class,
ICountableListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class).handles(ICountable.class)
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(ICountableListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -146,42 +146,20 @@ public class SubscriptionManagerTest extends AssertSupport {
public void testMultipleMessageListeners(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
ICountableListener.DisabledListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.DisabledListener.class,
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.DisabledListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(ICountableListener.AsyncListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class)
.listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class);
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testStrongListenerSubscription() throws Exception {
ListenerFactory listeners = listeners(CustomInvocationListener.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
listeners.clear();
runGC();
Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(StandardMessage.class);
assertEquals(1, subscriptions.size());
for(Subscription sub : subscriptions)
assertEquals(InstancesPerListener, sub.size());
}
@Test
public void testOverloadedMessageHandlers(){
@ -199,19 +177,6 @@ public class SubscriptionManagerTest extends AssertSupport {
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testPrioritizedMessageHandlers(){
ListenerFactory listeners = listeners(PrioritizedListener.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(PrioritizedListener.class).handles(IMessage.class, IMessage.class, IMessage.class, IMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
private BusRuntime mockedRuntime(){
return new BusRuntime(null)
.add(BusRuntime.Properties.ErrorHandlers, Collections.EMPTY_SET)
@ -241,33 +206,4 @@ public class SubscriptionManagerTest extends AssertSupport {
}
/**
* define handlers with different priorities which need to be executed
* in their respective order
*/
public static class PrioritizedListener{
@Handler(priority = 1)
public void handlePrio1(IMessage message){
message.handled(this.getClass());
}
@Handler(priority = 2)
public void handlePrio2(IMessage message){
message.handled(this.getClass());
}
@Handler(priority = 3)
public void handlePrio3(IMessage message){
message.handled(this.getClass());
}
@Handler(priority = 4)
public void handlePrio4(IMessage message){
message.handled(this.getClass());
}
}
}

View File

@ -1,5 +1,7 @@
package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.GenericMessagePublicationSupport;
@ -9,19 +11,16 @@ import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listeners.CustomInvocationListener;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
@ -111,66 +110,6 @@ public abstract class SyncBusTest extends MessageBusTest {
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
}
@Test
public void testCustomHandlerInvocation(){
final GenericMessagePublicationSupport bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, CustomInvocationListener.class)
.create(InstancesPerListener, Object.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
assertEquals(InstancesPerListener * 2, standardMessage.getTimesHandled(CustomInvocationListener.class));
assertEquals(0, multipartMessage.getTimesHandled(CustomInvocationListener.class));
assertEquals(0, MessageTypes.Simple.getTimesHandled(CustomInvocationListener.class));
}
};
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
// multi threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
}
@Test
public void testHandlerPriorities(){
final GenericMessagePublicationSupport bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, PrioritizedListener.class)
.create(InstancesPerListener, Object.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
bus.post(new IncrementingMessage()).now();
}
};
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
// multi threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
}
public static class MBassadorTest extends SyncBusTest {
@ -199,37 +138,12 @@ public abstract class SyncBusTest extends MessageBusTest {
public void markHandled(int newVal){
// only transitions by the next handler are allowed
if(count == newVal || count + 1 == newVal) count = newVal;
else Assert.fail("Message was handled out of order");
if(this.count == newVal || this.count + 1 == newVal) {
this.count = newVal;
} else {
Assert.fail("Message was handled out of order");
}
}
}
public static class PrioritizedListener{
@Handler(priority = Integer.MIN_VALUE)
public void handle1(IncrementingMessage message) {
message.markHandled(1);
}
@Handler(priority = -2)
public void handle2(IncrementingMessage message) {
message.markHandled(2);
}
@Handler
public void handle3(IncrementingMessage message) {
message.markHandled(3);
}
@Handler(priority = Integer.MAX_VALUE)
public void handle4(IncrementingMessage message) {
message.markHandled(4);
}
}
}

View File

@ -1,17 +1,17 @@
package net.engio.mbassy;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.listener.Synchronized;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
import org.junit.Test;
/**
* Todo: Add javadoc
@ -54,33 +54,6 @@ public class SynchronizedHandlerTest extends MessageBusTest {
}
@Test
public void testSynchronizedWithAsSynchronousInvocation(){
List<SynchronizedWithAsynchronousDelivery> handlers = new LinkedList<SynchronizedWithAsynchronousDelivery>();
IBusConfiguration config = SyncAsync();
config.getFeature(Feature.AsynchronousMessageDispatch.class)
.setNumberOfMessageDispatchers(6);
IMessageBus bus = createBus(config);
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithAsynchronousDelivery handler = new SynchronizedWithAsynchronousDelivery();
handlers.add(handler);
bus.subscribe(handler);
}
for(int i = 0; i < numberOfMessages; i++){
track(bus.post(new Object()).asynchronously());
}
pause(10000);
for(SynchronizedWithAsynchronousDelivery handler : handlers){
assertEquals(incrementsPerMessage * numberOfMessages, handler.counter);
}
}
public static class SynchronizedWithSynchronousDelivery {
private int counter = 0;
@ -89,23 +62,9 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Synchronized
public void handleMessage(Object o){
for(int i = 0; i < incrementsPerMessage; i++){
counter++;
this.counter++;
}
}
}
public static class SynchronizedWithAsynchronousDelivery {
private int counter = 0;
@Handler(delivery = Invoke.Asynchronously)
@Synchronized
public void handleMessage(Object o){
for(int i = 0; i < incrementsPerMessage; i++){
counter++;
}
}
}
}

View File

@ -1,14 +1,17 @@
package net.engio.mbassy.common;
import java.util.concurrent.ConcurrentHashMap;
import junit.framework.Assert;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.messages.MessageTypes;
import org.junit.Before;
/**
@ -36,14 +39,15 @@ public abstract class MessageBusTest extends AssertSupport {
}
};
private StrongConcurrentSet<IMessagePublication> issuedPublications = new StrongConcurrentSet<IMessagePublication>();
private static final Object mapObject = new Object();
private ConcurrentHashMap<IMessagePublication, Object> issuedPublications = new ConcurrentHashMap<IMessagePublication, Object>();
@Before
public void setUp(){
issuedPublications = new StrongConcurrentSet<IMessagePublication>();
for(MessageTypes mes : MessageTypes.values())
this.issuedPublications = new ConcurrentHashMap<IMessagePublication, Object>();
for(MessageTypes mes : MessageTypes.values()) {
mes.reset();
}
}
public static IBusConfiguration SyncAsync() {
@ -67,23 +71,25 @@ public abstract class MessageBusTest extends AssertSupport {
}
protected void track(IMessagePublication asynchronously) {
issuedPublications.add(asynchronously);
this.issuedPublications.put(asynchronously, mapObject);
}
public void waitForPublications(long timeOutInMs){
long start = System.currentTimeMillis();
while(issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){
for(IMessagePublication pub : issuedPublications){
if(pub.isFinished())
issuedPublications.remove(pub);
while(this.issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){
for(IMessagePublication pub : this.issuedPublications.keySet()){
if(pub.isFinished()) {
this.issuedPublications.remove(pub);
}
}
}
if(issuedPublications.size() > 0)
if(this.issuedPublications.size() > 0) {
fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms");
}
}
public void addPublication(IMessagePublication publication){
issuedPublications.add(publication);
this.issuedPublications.put(publication, mapObject);
}
}

View File

@ -1,8 +1,9 @@
package net.engio.mbassy.common;
import net.engio.mbassy.messages.IMessage;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.messages.IMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,11 +16,11 @@ import org.slf4j.LoggerFactory;
* To change this template use File | Settings | File Templates.
*/
public class MessageManager {
private static final Logger LOG =
LoggerFactory.getLogger(MessageManager.class);
private static final Logger LOG =
LoggerFactory.getLogger(MessageManager.class);
private StrongConcurrentSet<MessageContext> messages = new StrongConcurrentSet();
private static final Object mapObject = new Object();
private ConcurrentHashMap<MessageContext, Object> messages = new ConcurrentHashMap<MessageContext, Object>();
public <T extends IMessage> T create(Class<T> messageType, int expectedCount, Class ...listeners){
@ -46,7 +47,7 @@ public class MessageManager {
public <T extends IMessage> void register(T message, int expectedCount, Class ...listeners){
try {
messages.add(new MessageContext(expectedCount, message, listeners));
this.messages.put(new MessageContext(expectedCount, message, listeners), mapObject);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -54,7 +55,7 @@ public class MessageManager {
public <T extends IMessage> void register(T message, int expectedCount, Collection<Class> listeners){
try {
messages.add(new MessageContext(expectedCount, message, listeners));
this.messages.put(new MessageContext(expectedCount, message, listeners), mapObject);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -62,9 +63,9 @@ public class MessageManager {
public void waitForMessages(int timeoutInMs){
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < timeoutInMs && messages.size() > 0){
while(System.currentTimeMillis() - start < timeoutInMs && this.messages.size() > 0){
// check each created message once
for(MessageContext messageCtx : messages){
for(MessageContext messageCtx : this.messages.keySet()){
boolean handledCompletely = true;
for(Class listener : messageCtx.getListeners()){
handledCompletely &= messageCtx.getMessage().getTimesHandled(listener) == messageCtx.getExpectedCount();
@ -72,14 +73,14 @@ public class MessageManager {
// remove the ones that were handled as expected
if(handledCompletely){
logSuccess(messageCtx);
messages.remove(messageCtx);
this.messages.remove(messageCtx);
}
}
pause(100);
}
if(messages.size() > 0){
logFailingMessages(messages);
if(this.messages.size() > 0){
logFailingMessages(this.messages);
throw new RuntimeException("Message were not fully processed in given time");
}
@ -100,11 +101,12 @@ public class MessageManager {
private void logFailingMessages(StrongConcurrentSet<MessageContext> failing){
private void logFailingMessages(ConcurrentHashMap<MessageContext, Object> failing){
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Failing messages:\n");
for(MessageContext failingMessage : failing)
for(MessageContext failingMessage : failing.keySet()) {
errorMessage.append(failingMessage);
}
LOG.info(errorMessage.toString());
}
@ -127,20 +129,20 @@ public class MessageManager {
}
private long getExpectedCount() {
return expectedCount;
return this.expectedCount;
}
private IMessage getMessage() {
return message;
return this.message;
}
private Class[] getListeners() {
return listeners;
return this.listeners;
}
private String printListeners(){
StringBuilder listenersAsString = new StringBuilder();
for(Class listener : listeners){
for(Class listener : this.listeners){
listenersAsString.append(listener.getName());
listenersAsString.append(",");
}
@ -150,8 +152,8 @@ public class MessageManager {
@Override
public String toString() {
// TODO: actual count of listeners
return message.getClass().getSimpleName() + "{" +
"expectedCount=" + expectedCount +
return this.message.getClass().getSimpleName() + "{" +
"expectedCount=" + this.expectedCount +
", listeners=" + printListeners() +
'}';
}

View File

@ -1,10 +1,14 @@
package net.engio.mbassy.common;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.*;
/**
*
* @author bennidi
@ -26,18 +30,17 @@ public class SubscriptionValidator extends AssertSupport{
}
private SubscriptionValidator expect(Class subscriber, Class messageType){
validations.add(new ValidationEntry(messageType, subscriber));
messageTypes.add(messageType);
this.validations.add(new ValidationEntry(messageType, subscriber));
this.messageTypes.add(messageType);
return this;
}
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager){
for(Class messageType : messageTypes){
for(Class messageType : this.messageTypes){
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
ensureOrdering(subscriptions);
Collection<ValidationEntry> validationEntries = getEntries(EntriesByMessageType(messageType));
Collection<ValidationEntry> validationEntries = getEntries(messageType);
assertEquals(subscriptions.size(), validationEntries.size());
for(ValidationEntry validationValidationEntry : validationEntries){
Subscription matchingSub = null;
@ -49,38 +52,23 @@ public class SubscriptionValidator extends AssertSupport{
}
}
assertNotNull(matchingSub);
assertEquals(subscribedListener.getNumberOfListeners(validationValidationEntry.subscriber), matchingSub.size());
assertEquals(this.subscribedListener.getNumberOfListeners(validationValidationEntry.subscriber), matchingSub.size());
}
}
}
private void ensureOrdering(Collection<Subscription> subscriptions){
int lastPriority = Integer.MAX_VALUE;// highest priority possible
for(Subscription sub : subscriptions){
assertTrue("Subscriptions should be ordered by priority (DESC)", lastPriority >= sub.getPriority());
lastPriority = sub.getPriority();
}
}
private Collection<ValidationEntry> getEntries(IPredicate<ValidationEntry> filter){
private Collection<ValidationEntry> getEntries(Class messageType){
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
for (ValidationEntry validationValidationEntry : validations){
if(filter.apply(validationValidationEntry))matching.add(validationValidationEntry);
for (ValidationEntry validationValidationEntry : this.validations){
if (validationValidationEntry.messageType.equals(messageType)) {
matching.add(validationValidationEntry);
}
}
return matching;
}
private IPredicate<ValidationEntry> EntriesByMessageType(final Class messageType){
return new IPredicate<ValidationEntry>() {
@Override
public boolean apply(ValidationEntry target) {
return target.messageType.equals(messageType);
}
};
}
public class Expectation{
@ -91,8 +79,9 @@ public class SubscriptionValidator extends AssertSupport{
}
public SubscriptionValidator handles(Class ...messages){
for(Class message : messages)
expect(listener, message);
for(Class message : messages) {
expect(this.listener, message);
}
return SubscriptionValidator.this;
}
}

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.AbstractMessage;
/**
@ -22,6 +21,7 @@ public class AbstractMessageListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(AbstractMessage message){
super.handle(message);
}
@ -29,24 +29,17 @@ public class AbstractMessageListener {
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true, priority = 4)
@Override
@Handler(rejectSubtypes = true)
public void handle(AbstractMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously, priority = Integer.MAX_VALUE)
public void handle(AbstractMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(AbstractMessage message){
super.handle(message);

View File

@ -1,37 +0,0 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.StandardMessage;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* @author bennidi
* Date: 5/25/13
*/
@Listener(references = References.Strong)
public class CustomInvocationListener {
// this handler will be invoked asynchronously
@Handler(invocation = HandleSubTestEventInvocation.class)
public void handle(StandardMessage message) {
message.handled(this.getClass());
message.handled(this.getClass());
}
public static class HandleSubTestEventInvocation extends HandlerInvocation<CustomInvocationListener, StandardMessage> {
public HandleSubTestEventInvocation(SubscriptionContext context) {
super(context);
}
@Override
public void invoke(CustomInvocationListener listener, StandardMessage message) {
listener.handle(message);
}
}
}

View File

@ -2,14 +2,13 @@ package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.StandardMessage;
/**
* @author bennidi
* Date: 5/25/13
*/
@Listener(references = References.Strong)
@Listener()
public class ExceptionThrowingListener {

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.ICountable;
/**
@ -22,6 +21,7 @@ public class ICountableListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(ICountable message){
super.handle(message);
}
@ -29,6 +29,7 @@ public class ICountableListener {
public static class NoSubtypesListener extends BaseListener {
@Override
@Handler(rejectSubtypes = true)
public void handle(ICountable message){
super.handle(message);
@ -36,17 +37,9 @@ public class ICountableListener {
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(ICountable message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(ICountable message){
super.handle(message);

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.IMessage;
/**
@ -22,6 +21,7 @@ public class IMessageListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(IMessage message){
super.handle(message);
}
@ -29,6 +29,7 @@ public class IMessageListener {
public static class NoSubtypesListener extends BaseListener {
@Override
@Handler(rejectSubtypes = true)
public void handle(IMessage message){
super.handle(message);
@ -36,17 +37,9 @@ public class IMessageListener {
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(IMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(IMessage message){
super.handle(message);

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.IMultipartMessage;
/**
@ -22,6 +21,7 @@ public class IMultipartMessageListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(IMultipartMessage message){
super.handle(message);
}
@ -29,25 +29,18 @@ public class IMultipartMessageListener {
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true, priority = Integer.MIN_VALUE)
@Override
@Handler(rejectSubtypes = true)
public void handle(IMultipartMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously, priority = Integer.MIN_VALUE)
public void handle(IMultipartMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false , priority = 4)
@Override
@Handler(enabled = false)
public void handle(IMultipartMessage message){
super.handle(message);
}

View File

@ -1,6 +1,11 @@
package net.engio.mbassy.listeners;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Created with IntelliJ IDEA.
@ -19,14 +24,6 @@ public class Listeners {
ICountableListener.DefaultListener.class,
IMultipartMessageListener.DefaultListener.class}));
private static final List<Class> Asynchronous = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.AsyncListener.class,
IMessageListener.AsyncListener.class,
StandardMessageListener.AsyncListener.class,
MultipartMessageListener.AsyncListener.class,
ICountableListener.AsyncListener.class,
IMultipartMessageListener.AsyncListener.class}));
private static final List<Class> SubtypeRejecting = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.NoSubtypesListener.class,
IMessageListener.NoSubtypesListener.class,
@ -47,28 +44,20 @@ public class Listeners {
private static final List<Class> HandlesIMessage = Collections.unmodifiableList(Arrays.asList(new Class[]{
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
IMessageListener.NoSubtypesListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.NoSubtypesListener.class,
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.NoSubtypesListener.class,
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.NoSubtypesListener.class,
MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.AsyncListener.class,
MultipartMessageListener.NoSubtypesListener.class}));
private static final List<Class> HandlesStandardessage = Collections.unmodifiableList(Arrays.asList(new Class[]{
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.NoSubtypesListener.class}));
@ -76,10 +65,6 @@ public class Listeners {
return Synchronous;
}
public static Collection<Class> asynchronous(){
return Asynchronous;
}
public static Collection<Class> subtypeRejecting(){
return SubtypeRejecting;
}
@ -99,10 +84,12 @@ public class Listeners {
public static Collection<Class> join(Collection<Class>...listenerSets){
Set<Class> join = new HashSet<Class>();
for(Collection<Class> listeners : listenerSets)
for(Collection<Class> listeners : listenerSets) {
join.addAll(listeners);
for(Collection<Class> listeners : listenerSets)
}
for(Collection<Class> listeners : listenerSets) {
join.retainAll(listeners);
}
return join;
}

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.MessageTypes;
/**
@ -22,6 +21,7 @@ public class MessagesListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(MessageTypes message){
super.handle(message);
}
@ -29,6 +29,7 @@ public class MessagesListener {
public static class NoSubtypesListener extends BaseListener {
@Override
@Handler(rejectSubtypes = true)
public void handle(MessageTypes message){
super.handle(message);
@ -36,17 +37,9 @@ public class MessagesListener {
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(MessageTypes message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(MessageTypes message){
super.handle(message);

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.MultipartMessage;
/**
@ -22,6 +21,7 @@ public class MultipartMessageListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(MultipartMessage message){
super.handle(message);
}
@ -29,6 +29,7 @@ public class MultipartMessageListener {
public static class NoSubtypesListener extends BaseListener {
@Override
@Handler(rejectSubtypes = true)
public void handle(MultipartMessage message){
super.handle(message);
@ -36,17 +37,9 @@ public class MultipartMessageListener {
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(MultipartMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(MultipartMessage message){
super.handle(message);

View File

@ -1,19 +1,19 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.listener.Handler;
public class ObjectListener {
private List handledMessages = Collections.synchronizedList(new LinkedList());
@Handler(priority = Integer.MAX_VALUE)
@Handler()
public void handle(Object message){
handledMessages.add(message);
this.handledMessages.add(message);
}
}

View File

@ -2,7 +2,6 @@ package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.AbstractMessage;
/**
@ -25,7 +24,7 @@ public class Overloading {
}
@Listener(references = References.Strong)
@Listener()
public static class ListenerBase {

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.StandardMessage;
/**
@ -13,7 +12,7 @@ public class StandardMessageListener {
private static abstract class BaseListener {
@Handler(priority = 3)
@Handler()
public void handle(StandardMessage message){
message.handled(this.getClass());
}
@ -22,6 +21,7 @@ public class StandardMessageListener {
public static class DefaultListener extends BaseListener {
@Override
public void handle(StandardMessage message){
super.handle(message);
}
@ -29,24 +29,17 @@ public class StandardMessageListener {
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true, priority = 4)
@Override
@Handler(rejectSubtypes = true)
public void handle(StandardMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously, priority = -10)
public void handle(StandardMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Override
@Handler(enabled = false)
public void handle(StandardMessage message){
super.handle(message);