refactorings. abstract base class. repackaging

This commit is contained in:
benni 2012-11-17 12:14:23 +01:00
parent 2ae0e5fae0
commit 9ce3ceb6f4
12 changed files with 401 additions and 456 deletions

View File

@ -0,0 +1,247 @@
package org.mbassy;
import org.mbassy.common.IPredicate;
import org.mbassy.common.ReflectionUtils;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MetadataReader;
import org.mbassy.subscription.Subscription;
import org.mbassy.subscription.SubscriptionDeliveryRequest;
import org.mbassy.subscription.SubscriptionFactory;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand> implements IMessageBus<T, P> {
// This predicate is used to find all message listeners (methods annotated with @Listener)
private static final IPredicate<Method> AllMessageListeners = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
}
};
// This is the default error handler it will simply log to standard out and
// print stack trace if available
protected static final class ConsoleLogger implements IPublicationErrorHandler {
@Override
public void handleError(PublicationError error) {
System.out.println(error);
if (error.getCause() != null) error.getCause().printStackTrace();
}
}
;
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor;
private MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage = new HashMap(50);
// all subscriptions per messageHandler type
// this list provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerListener = new HashMap(50);
// remember already processed classes that do not contain any listeners
private final Collection<Class> nonListeners = new HashSet();
// this handler will receive all errors that occur during message dispatch or message handling
private CopyOnWriteArrayList<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
// all threads that are available for asynchronous message dispatching
private final CopyOnWriteArrayList<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final LinkedBlockingQueue<SubscriptionDeliveryRequest<T>> pendingMessages = new LinkedBlockingQueue<SubscriptionDeliveryRequest<T>>();
private final SubscriptionFactory subscriptionFactory;
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
public void run() {
while (true) {
try {
pendingMessages.take().execute();
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
return;
}
}
}
});
dispatchers.add(dispatcher);
dispatcher.start();
}
}
public AbstractMessageBus() {
this(2);
}
public AbstractMessageBus(int dispatcherThreadCount) {
this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
}
public AbstractMessageBus(int dispatcherThreadCount, ExecutorService executor) {
this.executor = executor;
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
addErrorHandler(new ConsoleLogger());
subscriptionFactory = getSubscriptionFactory();
initialize();
}
protected abstract SubscriptionFactory getSubscriptionFactory();
protected void initialize(){}
@Override
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers);
}
public void unsubscribe(Object listener) {
if (listener == null) return;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if (subscriptions == null) return;
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
public void subscribe(Object listener) {
try {
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass))
return; // early reject of known classes that do not participate in eventing
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially for each class
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = getListeners(listeningClass); // get all methods with subscriptions
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
if (messageHandlers.isEmpty()) { // remember the class as non listening class
nonListeners.add(listeningClass);
return;
}
// create subscriptions for all detected listeners
for (Method messageHandler : messageHandlers) {
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
Class eventType = getMessageType(messageHandler);
Subscription subscription = subscriptionFactory.createSubscription(metadataReader.getHandlerMetadata(messageHandler));
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
//updateMessageTypeHierarchy(eventType);
}
subscriptionsPerListener.put(listeningClass, subscriptionsByListener);
}
}
}
// register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void addErrorHandler(IPublicationErrorHandler handler) {
errorHandlers.add(handler);
}
protected void addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) {
pendingMessages.offer(request);
}
// obtain the set of subscriptions for the given message type
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : getSuperclasses(messageType)) {
if (subscriptionsPerMessage.get(eventSuperType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
// IMPROVEMENT: use tree list that sorts during insertion
//Collections.sort(subscriptions, new SubscriptionByPriorityDesc());
return subscriptions;
}
private Collection<Class> getSuperclasses(Class from) {
Collection<Class> superclasses = new LinkedList<Class>();
while (!from.equals(Object.class)) {
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
return superclasses;
}
// associate a suscription with a message type
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArraySet<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
}
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
return true;
}
private static Class getMessageType(Method listener) {
return listener.getParameterTypes()[0];
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
private static List<Method> getListeners(Class<?> target) {
return ReflectionUtils.getMethods(AllMessageListeners, target);
}
public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers)
errorHandler.handleError(error);
}
@Override
protected void finalize() throws Throwable {
super.finalize();
for (Thread dispatcher : dispatchers) {
dispatcher.interrupt();
}
}
@Override
public Executor getExecutor() {
return executor;
}
}

View File

@ -1,5 +1,8 @@
package org.mbassy;
import java.util.Collection;
import java.util.concurrent.Executor;
/**
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
@ -69,6 +72,25 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*/
public P post(T message);
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*
* @param errorHandler
*/
public void addErrorHandler(IPublicationErrorHandler errorHandler);
/**
* Returns an immutable collection containing all the registered error handlers
*
* @return
*/
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
public Executor getExecutor();
public static interface IPostCommand{

View File

@ -1,4 +1,4 @@
package org.mbassy.common;
package org.mbassy;
/**
* TODO. Insert class description here

View File

@ -1,102 +1,32 @@
package org.mbassy;
import org.mbassy.filter.Filter;
import org.mbassy.filter.MessageFilter;
import org.mbassy.common.*;
import org.mbassy.subscription.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
public class MBassador<T> implements IMessageBus<T, SimplePostCommand>{
// This predicate is used to find all message listeners (methods annotated with @Listener)
private static final IPredicate<Method> AllMessageListeners = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
}
};
// This is the default error handler it will simply log to standard out and
// print stack trace if available
protected static final class ConsoleLogger implements IPublicationErrorHandler {
@Override
public void handleError(PublicationError error) {
System.out.println(error);
if (error.getCause() != null) error.getCause().printStackTrace();
}
};
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor;
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage = new HashMap(50);
// all subscriptions per messageHandler type
// this list provides fast access for subscribing and unsubscribing
private final Map<Class, Collection<Subscription>> subscriptionsPerListener = new HashMap(50);
// remember already processed classes that do not contain any listeners
private final Collection<Class> nonListeners = new HashSet();
// this handler will receive all errors that occur during message dispatch or message handling
private IPublicationErrorHandler errorHandler = new ConsoleLogger();
// all threads that are available for asynchronous message dispatching
private final CopyOnWriteArrayList<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final LinkedBlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>();
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
public void run() {
while (true) {
try {
publish(pendingMessages.take());
} catch (InterruptedException e) {
errorHandler.handleError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
return;
}
}
}
});
dispatchers.add(dispatcher);
dispatcher.start();
}
}
public class MBassador<T> extends AbstractMessageBus<T, SimplePostCommand<T>>{
public MBassador(){
this(2);
}
public MBassador(int dispatcherThreadCount){
this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
super(dispatcherThreadCount);
}
public MBassador(int dispatcherThreadCount, ExecutorService executor){
this.executor = executor;
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
super(dispatcherThreadCount,executor);
}
@Override
protected SubscriptionFactory getSubscriptionFactory() {
return new SubscriptionFactory(this);
}
public void publishAsync(T message){
pendingMessages.offer(message);
addAsynchronousDeliveryRequest(new SubscriptionDeliveryRequest<T>(getSubscriptionsByMessageType(message.getClass()), message));
}
@ -125,370 +55,9 @@ public class MBassador<T> implements IMessageBus<T, SimplePostCommand>{
}
public void unsubscribe(Object listener){
if (listener == null) return;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if(subscriptions == null)return;
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
@Override
public SimplePostCommand post(T message) {
return new SimplePostCommand(this, message);
}
public void subscribe(Object listener){
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass))
return; // early reject of known classes that do not participate in eventing
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially for each class
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = getListeners(listeningClass); // get all methods with subscriptions
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
if (messageHandlers.isEmpty()) { // remember the class as non listening class
nonListeners.add(listeningClass);
return;
}
// create subscriptions for all detected listeners
for (Method messageHandler : messageHandlers) {
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class));
Class eventType = getMessageType(messageHandler);
Subscription subscription = createSubscription(messageHandler, filter);
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
//updateMessageTypeHierarchy(eventType);
}
subscriptionsPerListener.put(listeningClass, subscriptionsByListener);
}
}
}
// register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
}
public void setErrorHandler(IPublicationErrorHandler handler){
this.errorHandler = handler;
}
// obtain the set of subscriptions for the given message type
private Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
List<Subscription> subscriptions = new LinkedList<Subscription>();
if(subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : getSuperclasses(messageType)){
if(subscriptionsPerMessage.get(eventSuperType) != null){
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
// IMPROVEMENT: use tree list that sorts during insertion
//Collections.sort(subscriptions, new SubscriptionByPriorityDesc());
return subscriptions;
}
private Collection<Class> getSuperclasses(Class from){
Collection<Class> superclasses = new LinkedList<Class>();
while(!from.equals(Object.class)){
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
return superclasses;
}
// associate a suscription with a message type
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArraySet<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
}
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
return true;
}
private static Class getMessageType(Method listener) {
return listener.getParameterTypes()[0];
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
private static List<Method> getListeners(Class<?> target) {
return ReflectionUtils.getMethods(AllMessageListeners, target);
}
// retrieve all instances of filters associated with the given subscription
private MessageFilter[] getFilter(Listener subscription) {
if (subscription.value().length == 0) return null;
MessageFilter[] filters = new MessageFilter[subscription.value().length];
int i = 0;
for (Filter filterDef : subscription.value()) {
MessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
try {
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error retrieving filter"));
}
}
filters[i] = filter;
i++;
}
return filters;
}
private void handlePublicationError(PublicationError error) {
errorHandler.handleError(error);
}
@Override
protected void finalize() throws Throwable {
super.finalize();
for(Thread dispatcher : dispatchers){
dispatcher.interrupt();
}
}
private Subscription createSubscription(Method messageHandler, MessageFilter[] filter){
if(filter == null || filter.length == 0){
if(isAsynchronous(messageHandler)){
return new UnfilteredAsynchronousSubscription(messageHandler);
}
else{
return new UnfilteredSynchronousSubscription(messageHandler);
}
}
else{
if(isAsynchronous(messageHandler)){
return new FilteredAsynchronousSubscription(messageHandler, filter);
}
else{
return new FilteredSynchronousSubscription(messageHandler, filter);
}
}
}
private boolean isAsynchronous(Method messageHandler){
return messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous);
}
/**
* Subscription is a thread safe container for objects that contain message handlers
*/
private abstract class Subscription {
private final Method messageHandler;
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
private int priority = 0;
private Subscription(Method messageHandler) {
// TODO: init priority
this.messageHandler = messageHandler;
this.messageHandler.setAccessible(true);
}
protected abstract void publish(Object message);
protected abstract void dispatch(final Object message, final Object listener);
public int getPriority(){
return priority;
}
public void subscribe(Object o) {
listeners.add(o);
}
protected void invokeHandler(final Object message, final Object listener){
try {
messageHandler.invoke(listener, message);
}catch(IllegalAccessException e){
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
messageHandler, listener, message));
}
catch(IllegalArgumentException e){
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + messageHandler.getParameterTypes()[0],
messageHandler, listener, message));
}
catch (InvocationTargetException e) {
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
messageHandler, listener, message));
}
catch (Throwable e) {
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
messageHandler, listener, message));
}
}
public void unsubscribe(Object existingListener) {
listeners.remove(existingListener);
}
}
private abstract class UnfilteredSubscription extends Subscription{
private UnfilteredSubscription(Method messageHandler) {
super(messageHandler);
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
dispatch(message, listener);
}
}
}
private class UnfilteredAsynchronousSubscription extends UnfilteredSubscription{
private UnfilteredAsynchronousSubscription(Method messageHandler) {
super(messageHandler);
}
protected void dispatch(final Object message, final Object listener){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}
private class UnfilteredSynchronousSubscription extends UnfilteredSubscription{
private UnfilteredSynchronousSubscription(Method messageHandler) {
super(messageHandler);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}
private abstract class FilteredSubscription extends Subscription{
private final MessageFilter[] filter;
private FilteredSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler);
this.filter = filter;
}
private boolean passesFilter(Object message, Object listener) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, listener)) return false;
}
return true;
}
}
protected void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
if(passesFilter(message, listener)) {
dispatch(message, listener);
}
}
}
}
private class FilteredSynchronousSubscription extends FilteredSubscription{
private FilteredSynchronousSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler, filter);
}
protected void dispatch(final Object message, final Object listener){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}
private class FilteredAsynchronousSubscription extends FilteredSubscription{
private FilteredAsynchronousSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler, filter);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}
private final class SubscriptionByPriorityDesc implements Comparator<Subscription> {
@Override
public int compare(Subscription o1, Subscription o2) {
return o1.getPriority() - o2.getPriority();
}
};
}

View File

@ -1,4 +1,4 @@
package org.mbassy.common;
package org.mbassy;
import java.lang.reflect.Method;

View File

@ -1,4 +1,4 @@
package org.mbassy.filter;
package org.mbassy.listener;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;

View File

@ -1,6 +1,4 @@
package org.mbassy;
import org.mbassy.filter.Filter;
package org.mbassy.listener;
import java.lang.annotation.*;
@ -16,12 +14,10 @@ import java.lang.annotation.*;
@Target(value = {ElementType.METHOD})
public @interface Listener {
Filter[] value() default {}; // no filters by default
Filter[] filters() default {}; // no filters by default
Dispatch mode() default Dispatch.Synchronous;
Mode dispatch() default Mode.Synchronous;
public static enum Dispatch{
Synchronous,Asynchronous
}
int priority() default 0;
}

View File

@ -1,4 +1,4 @@
package org.mbassy.filter;
package org.mbassy.listener;
/**
* Object filters can be used to prevent certain messages to be delivered to a specific listener.

View File

@ -0,0 +1,51 @@
package org.mbassy.listener;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
import org.mbassy.listener.MessageFilter;
import java.lang.reflect.Method;
/**
* User: benni
* Date: 11/14/12
*/
public class MessageHandlerMetadata {
private Method handler;
private MessageFilter[] filter;
private Listener listenerConfig;
private boolean isAsynchronous = false;
public MessageHandlerMetadata(Method handler, MessageFilter[] filter, Listener listenerConfig) {
this.handler = handler;
this.filter = filter;
this.listenerConfig = listenerConfig;
this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous);
}
public boolean isAsynchronous(){
return isAsynchronous;
}
public boolean isFiltered(){
return filter == null || filter.length == 0;
}
public int getPriority(){
return listenerConfig.priority();
}
public Method getHandler() {
return handler;
}
public MessageFilter[] getFilter() {
return filter;
}
}

View File

@ -0,0 +1,43 @@
package org.mbassy.listener;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/16/12
* Time: 10:22 AM
* To change this template use File | Settings | File Templates.
*/
public class MetadataReader {
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
// retrieve all instances of filters associated with the given subscription
private MessageFilter[] getFilter(Listener subscription) throws Exception{
if (subscription.filters().length == 0) return null;
MessageFilter[] filters = new MessageFilter[subscription.filters().length];
int i = 0;
for (Filter filterDef : subscription.filters()) {
MessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
}
filters[i] = filter;
i++;
}
return filters;
}
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) throws Exception{
Listener config = messageHandler.getAnnotation(Listener.class);
MessageFilter[] filter = getFilter(config);
return new MessageHandlerMetadata(messageHandler, filter, config);
}
}

View File

@ -0,0 +1,12 @@
package org.mbassy.listener;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/16/12
* Time: 10:01 AM
* To change this template use File | Settings | File Templates.
*/
public enum Mode {
Synchronous,Asynchronous
}

View File

@ -2,8 +2,10 @@ package org.mbassy;
import org.junit.Assert;
import org.junit.Test;
import org.mbassy.filter.Filter;
import org.mbassy.filter.MessageFilter;
import org.mbassy.listener.Filter;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.Mode;
import java.util.ArrayList;
import java.util.List;
@ -187,14 +189,17 @@ public class MBassadorTest {
}
// this handler will be invoked asynchronously
@Listener(mode = Listener.Dispatch.Asynchronous)
@Listener(priority = 0, dispatch = Mode.Asynchronous)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet();
}
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Listener({@Filter(MessageFilter.None.class),@Filter(MessageFilter.All.class)})
@Listener(
priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(MessageFilter.None.class),@Filter(MessageFilter.All.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
}