diff --git a/src/main/java/org/mbassy/AbstractMessageBus.java b/src/main/java/org/mbassy/AbstractMessageBus.java new file mode 100644 index 0000000..9747408 --- /dev/null +++ b/src/main/java/org/mbassy/AbstractMessageBus.java @@ -0,0 +1,247 @@ +package org.mbassy; + +import org.mbassy.common.IPredicate; +import org.mbassy.common.ReflectionUtils; +import org.mbassy.listener.Listener; +import org.mbassy.listener.MetadataReader; +import org.mbassy.subscription.Subscription; +import org.mbassy.subscription.SubscriptionDeliveryRequest; +import org.mbassy.subscription.SubscriptionFactory; + +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.*; + + +public abstract class AbstractMessageBus implements IMessageBus { + + + // This predicate is used to find all message listeners (methods annotated with @Listener) + private static final IPredicate AllMessageListeners = new IPredicate() { + @Override + public boolean apply(Method target) { + return target.getAnnotation(Listener.class) != null; + } + }; + + // This is the default error handler it will simply log to standard out and + // print stack trace if available + protected static final class ConsoleLogger implements IPublicationErrorHandler { + @Override + public void handleError(PublicationError error) { + System.out.println(error); + if (error.getCause() != null) error.getCause().printStackTrace(); + } + } + + ; + + // executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost + private ExecutorService executor; + + private MetadataReader metadataReader = new MetadataReader(); + + // all subscriptions per message type + // this is the primary list for dispatching a specific message + // write access is synchronized and happens very infrequently + private final Map> subscriptionsPerMessage = new HashMap(50); + + // all subscriptions per messageHandler type + // this list provides fast access for subscribing and unsubscribing + // write access is synchronized and happens very infrequently + private final Map> subscriptionsPerListener = new HashMap(50); + + // remember already processed classes that do not contain any listeners + private final Collection nonListeners = new HashSet(); + + // this handler will receive all errors that occur during message dispatch or message handling + private CopyOnWriteArrayList errorHandlers = new CopyOnWriteArrayList(); + + // all threads that are available for asynchronous message dispatching + private final CopyOnWriteArrayList dispatchers = new CopyOnWriteArrayList(); + + // all pending messages scheduled for asynchronous dispatch are queued here + private final LinkedBlockingQueue> pendingMessages = new LinkedBlockingQueue>(); + + private final SubscriptionFactory subscriptionFactory; + + // initialize the dispatch workers + private void initDispatcherThreads(int numberOfThreads) { + for (int i = 0; i < numberOfThreads; i++) { + // each thread will run forever and process incoming + //dispatch requests + Thread dispatcher = new Thread(new Runnable() { + public void run() { + while (true) { + try { + pendingMessages.take().execute(); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null)); + return; + } + } + } + }); + dispatchers.add(dispatcher); + dispatcher.start(); + } + } + + public AbstractMessageBus() { + this(2); + } + + public AbstractMessageBus(int dispatcherThreadCount) { + this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue())); + } + + public AbstractMessageBus(int dispatcherThreadCount, ExecutorService executor) { + this.executor = executor; + initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2); + addErrorHandler(new ConsoleLogger()); + subscriptionFactory = getSubscriptionFactory(); + initialize(); + } + + protected abstract SubscriptionFactory getSubscriptionFactory(); + + protected void initialize(){} + + @Override + public Collection getRegisteredErrorHandlers() { + return Collections.unmodifiableCollection(errorHandlers); + } + + public void unsubscribe(Object listener) { + if (listener == null) return; + Collection subscriptions = subscriptionsPerListener.get(listener.getClass()); + if (subscriptions == null) return; + for (Subscription subscription : subscriptions) { + subscription.unsubscribe(listener); + } + } + + + public void subscribe(Object listener) { + try { + Class listeningClass = listener.getClass(); + if (nonListeners.contains(listeningClass)) + return; // early reject of known classes that do not participate in eventing + Collection subscriptionsByListener = subscriptionsPerListener.get(listeningClass); + if (subscriptionsByListener == null) { // if the type is registered for the first time + synchronized (this) { // new subscriptions must be processed sequentially for each class + subscriptionsByListener = subscriptionsPerListener.get(listeningClass); + if (subscriptionsByListener == null) { // double check (a bit ugly but works here) + List messageHandlers = getListeners(listeningClass); // get all methods with subscriptions + subscriptionsByListener = new ArrayList(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only) + if (messageHandlers.isEmpty()) { // remember the class as non listening class + nonListeners.add(listeningClass); + return; + } + // create subscriptions for all detected listeners + for (Method messageHandler : messageHandlers) { + if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners + Class eventType = getMessageType(messageHandler); + Subscription subscription = subscriptionFactory.createSubscription(metadataReader.getHandlerMetadata(messageHandler)); + subscription.subscribe(listener); + addMessageTypeSubscription(eventType, subscription); + subscriptionsByListener.add(subscription); + //updateMessageTypeHierarchy(eventType); + } + subscriptionsPerListener.put(listeningClass, subscriptionsByListener); + } + } + } + // register the listener to the existing subscriptions + for (Subscription sub : subscriptionsByListener) sub.subscribe(listener); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + public void addErrorHandler(IPublicationErrorHandler handler) { + errorHandlers.add(handler); + } + + protected void addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest request) { + pendingMessages.offer(request); + } + + // obtain the set of subscriptions for the given message type + protected Collection getSubscriptionsByMessageType(Class messageType) { + Set subscriptions = new TreeSet(Subscription.SubscriptionByPriorityDesc); + + if (subscriptionsPerMessage.get(messageType) != null) { + subscriptions.addAll(subscriptionsPerMessage.get(messageType)); + } + for (Class eventSuperType : getSuperclasses(messageType)) { + if (subscriptionsPerMessage.get(eventSuperType) != null) { + subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType)); + } + } + // IMPROVEMENT: use tree list that sorts during insertion + //Collections.sort(subscriptions, new SubscriptionByPriorityDesc()); + return subscriptions; + } + + private Collection getSuperclasses(Class from) { + Collection superclasses = new LinkedList(); + while (!from.equals(Object.class)) { + superclasses.add(from.getSuperclass()); + from = from.getSuperclass(); + } + return superclasses; + } + + // associate a suscription with a message type + private void addMessageTypeSubscription(Class messageType, Subscription subscription) { + Collection subscriptions = subscriptionsPerMessage.get(messageType); + if (subscriptions == null) { + subscriptions = new CopyOnWriteArraySet(); + subscriptionsPerMessage.put(messageType, subscriptions); + } + subscriptions.add(subscription); + } + + + private boolean isValidMessageHandler(Method handler) { + if (handler.getParameterTypes().length != 1) { + // a messageHandler only defines one parameter (the message) + System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName() + + "]. A messageHandler must define exactly one parameter"); + return false; + } + return true; + } + + private static Class getMessageType(Method listener) { + return listener.getParameterTypes()[0]; + } + + // get all listeners defined by the given class (includes + // listeners defined in super classes) + private static List getListeners(Class target) { + return ReflectionUtils.getMethods(AllMessageListeners, target); + } + + + public void handlePublicationError(PublicationError error) { + for (IPublicationErrorHandler errorHandler : errorHandlers) + errorHandler.handleError(error); + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + for (Thread dispatcher : dispatchers) { + dispatcher.interrupt(); + } + } + + @Override + public Executor getExecutor() { + return executor; + } + +} diff --git a/src/main/java/org/mbassy/IMessageBus.java b/src/main/java/org/mbassy/IMessageBus.java index 768a259..adc8742 100644 --- a/src/main/java/org/mbassy/IMessageBus.java +++ b/src/main/java/org/mbassy/IMessageBus.java @@ -1,5 +1,8 @@ package org.mbassy; +import java.util.Collection; +import java.util.concurrent.Executor; + /** * * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched @@ -69,6 +72,25 @@ public interface IMessageBus { */ public P post(T message); + /** + * Publication errors may occur at various points of time during message delivery. A handler may throw an exception, + * may not be accessible due to security constraints or is not annotated properly. + * In any of all possible cases a publication error is created and passed to each of the registered error handlers. + * A call to this method will add the given error handler to the chain + * + * @param errorHandler + */ + public void addErrorHandler(IPublicationErrorHandler errorHandler); + + /** + * Returns an immutable collection containing all the registered error handlers + * + * @return + */ + public Collection getRegisteredErrorHandlers(); + + public Executor getExecutor(); + public static interface IPostCommand{ diff --git a/src/main/java/org/mbassy/common/IPublicationErrorHandler.java b/src/main/java/org/mbassy/IPublicationErrorHandler.java similarity index 88% rename from src/main/java/org/mbassy/common/IPublicationErrorHandler.java rename to src/main/java/org/mbassy/IPublicationErrorHandler.java index 1789961..14780c2 100644 --- a/src/main/java/org/mbassy/common/IPublicationErrorHandler.java +++ b/src/main/java/org/mbassy/IPublicationErrorHandler.java @@ -1,4 +1,4 @@ -package org.mbassy.common; +package org.mbassy; /** * TODO. Insert class description here diff --git a/src/main/java/org/mbassy/MBassador.java b/src/main/java/org/mbassy/MBassador.java index eb5e62e..c9684ff 100644 --- a/src/main/java/org/mbassy/MBassador.java +++ b/src/main/java/org/mbassy/MBassador.java @@ -1,102 +1,32 @@ package org.mbassy; -import org.mbassy.filter.Filter; -import org.mbassy.filter.MessageFilter; -import org.mbassy.common.*; +import org.mbassy.subscription.*; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.*; -public class MBassador implements IMessageBus{ - - - // This predicate is used to find all message listeners (methods annotated with @Listener) - private static final IPredicate AllMessageListeners = new IPredicate() { - @Override - public boolean apply(Method target) { - return target.getAnnotation(Listener.class) != null; - } - }; - - // This is the default error handler it will simply log to standard out and - // print stack trace if available - protected static final class ConsoleLogger implements IPublicationErrorHandler { - @Override - public void handleError(PublicationError error) { - System.out.println(error); - if (error.getCause() != null) error.getCause().printStackTrace(); - } - }; - - // executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost - private ExecutorService executor; - - // cache already created filter instances - private final Map, MessageFilter> filterCache = new HashMap, MessageFilter>(); - - // all subscriptions per message type - // this is the primary list for dispatching a specific message - // write access is synchronized and happens very infrequently - private final Map> subscriptionsPerMessage = new HashMap(50); - - // all subscriptions per messageHandler type - // this list provides fast access for subscribing and unsubscribing - private final Map> subscriptionsPerListener = new HashMap(50); - - // remember already processed classes that do not contain any listeners - private final Collection nonListeners = new HashSet(); - - // this handler will receive all errors that occur during message dispatch or message handling - private IPublicationErrorHandler errorHandler = new ConsoleLogger(); - - - // all threads that are available for asynchronous message dispatching - private final CopyOnWriteArrayList dispatchers = new CopyOnWriteArrayList(); - - // all pending messages scheduled for asynchronous dispatch are queued here - private final LinkedBlockingQueue pendingMessages = new LinkedBlockingQueue(); - - // initialize the dispatch workers - private void initDispatcherThreads(int numberOfThreads) { - for (int i = 0; i < numberOfThreads; i++) { - // each thread will run forever and process incoming - //dispatch requests - Thread dispatcher = new Thread(new Runnable() { - public void run() { - while (true) { - try { - publish(pendingMessages.take()); - } catch (InterruptedException e) { - errorHandler.handleError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null)); - return; - } - } - } - }); - dispatchers.add(dispatcher); - dispatcher.start(); - } - } +public class MBassador extends AbstractMessageBus>{ public MBassador(){ this(2); } public MBassador(int dispatcherThreadCount){ - this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue())); + super(dispatcherThreadCount); } public MBassador(int dispatcherThreadCount, ExecutorService executor){ - this.executor = executor; - initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2); + super(dispatcherThreadCount,executor); } + @Override + protected SubscriptionFactory getSubscriptionFactory() { + return new SubscriptionFactory(this); + } public void publishAsync(T message){ - pendingMessages.offer(message); + addAsynchronousDeliveryRequest(new SubscriptionDeliveryRequest(getSubscriptionsByMessageType(message.getClass()), message)); } @@ -125,370 +55,9 @@ public class MBassador implements IMessageBus{ } - public void unsubscribe(Object listener){ - if (listener == null) return; - Collection subscriptions = subscriptionsPerListener.get(listener.getClass()); - if(subscriptions == null)return; - for (Subscription subscription : subscriptions) { - subscription.unsubscribe(listener); - } - } - @Override public SimplePostCommand post(T message) { return new SimplePostCommand(this, message); } - public void subscribe(Object listener){ - Class listeningClass = listener.getClass(); - if (nonListeners.contains(listeningClass)) - return; // early reject of known classes that do not participate in eventing - Collection subscriptionsByListener = subscriptionsPerListener.get(listeningClass); - if (subscriptionsByListener == null) { // if the type is registered for the first time - synchronized (this) { // new subscriptions must be processed sequentially for each class - subscriptionsByListener = subscriptionsPerListener.get(listeningClass); - if (subscriptionsByListener == null) { // double check (a bit ugly but works here) - List messageHandlers = getListeners(listeningClass); // get all methods with subscriptions - subscriptionsByListener = new ArrayList(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only) - if (messageHandlers.isEmpty()) { // remember the class as non listening class - nonListeners.add(listeningClass); - return; - } - // create subscriptions for all detected listeners - for (Method messageHandler : messageHandlers) { - if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners - MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class)); - Class eventType = getMessageType(messageHandler); - Subscription subscription = createSubscription(messageHandler, filter); - subscription.subscribe(listener); - addMessageTypeSubscription(eventType, subscription); - subscriptionsByListener.add(subscription); - //updateMessageTypeHierarchy(eventType); - } - subscriptionsPerListener.put(listeningClass, subscriptionsByListener); - } - } - } - // register the listener to the existing subscriptions - for (Subscription sub : subscriptionsByListener) sub.subscribe(listener); - } - - - public void setErrorHandler(IPublicationErrorHandler handler){ - this.errorHandler = handler; - } - - - - // obtain the set of subscriptions for the given message type - private Collection getSubscriptionsByMessageType(Class messageType) { - List subscriptions = new LinkedList(); - - if(subscriptionsPerMessage.get(messageType) != null) { - subscriptions.addAll(subscriptionsPerMessage.get(messageType)); - } - for (Class eventSuperType : getSuperclasses(messageType)){ - if(subscriptionsPerMessage.get(eventSuperType) != null){ - subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType)); - } - } - // IMPROVEMENT: use tree list that sorts during insertion - //Collections.sort(subscriptions, new SubscriptionByPriorityDesc()); - return subscriptions; - } - - private Collection getSuperclasses(Class from){ - Collection superclasses = new LinkedList(); - while(!from.equals(Object.class)){ - superclasses.add(from.getSuperclass()); - from = from.getSuperclass(); - } - return superclasses; - } - - // associate a suscription with a message type - private void addMessageTypeSubscription(Class messageType, Subscription subscription) { - Collection subscriptions = subscriptionsPerMessage.get(messageType); - if (subscriptions == null) { - subscriptions = new CopyOnWriteArraySet(); - subscriptionsPerMessage.put(messageType, subscriptions); - } - subscriptions.add(subscription); - } - - - private boolean isValidMessageHandler(Method handler) { - if (handler.getParameterTypes().length != 1) { - // a messageHandler only defines one parameter (the message) - System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName() - + "]. A messageHandler must define exactly one parameter"); - return false; - } - return true; - } - - private static Class getMessageType(Method listener) { - return listener.getParameterTypes()[0]; - } - - // get all listeners defined by the given class (includes - // listeners defined in super classes) - private static List getListeners(Class target) { - return ReflectionUtils.getMethods(AllMessageListeners, target); - } - - // retrieve all instances of filters associated with the given subscription - private MessageFilter[] getFilter(Listener subscription) { - if (subscription.value().length == 0) return null; - MessageFilter[] filters = new MessageFilter[subscription.value().length]; - int i = 0; - for (Filter filterDef : subscription.value()) { - MessageFilter filter = filterCache.get(filterDef.value()); - if (filter == null) { - try { - filter = filterDef.value().newInstance(); - filterCache.put(filterDef.value(), filter); - } catch (Throwable e) { - handlePublicationError(new PublicationError() - .setMessage("Error retrieving filter")); - } - - } - filters[i] = filter; - i++; - } - return filters; - } - - - - private void handlePublicationError(PublicationError error) { - errorHandler.handleError(error); - } - - @Override - protected void finalize() throws Throwable { - super.finalize(); - for(Thread dispatcher : dispatchers){ - dispatcher.interrupt(); - } - } - - - private Subscription createSubscription(Method messageHandler, MessageFilter[] filter){ - if(filter == null || filter.length == 0){ - if(isAsynchronous(messageHandler)){ - return new UnfilteredAsynchronousSubscription(messageHandler); - } - else{ - return new UnfilteredSynchronousSubscription(messageHandler); - } - } - else{ - if(isAsynchronous(messageHandler)){ - return new FilteredAsynchronousSubscription(messageHandler, filter); - } - else{ - return new FilteredSynchronousSubscription(messageHandler, filter); - } - } - } - - private boolean isAsynchronous(Method messageHandler){ - return messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous); - } - - - /** - * Subscription is a thread safe container for objects that contain message handlers - */ - private abstract class Subscription { - - private final Method messageHandler; - - protected ConcurrentSet listeners = new ConcurrentSet(); - - private int priority = 0; - - private Subscription(Method messageHandler) { - // TODO: init priority - this.messageHandler = messageHandler; - this.messageHandler.setAccessible(true); - } - - protected abstract void publish(Object message); - - protected abstract void dispatch(final Object message, final Object listener); - - - public int getPriority(){ - return priority; - } - - - public void subscribe(Object o) { - listeners.add(o); - - } - - protected void invokeHandler(final Object message, final Object listener){ - try { - messageHandler.invoke(listener, message); - }catch(IllegalAccessException e){ - MBassador.this.handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "The class or method is not accessible", - messageHandler, listener, message)); - } - catch(IllegalArgumentException e){ - MBassador.this.handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + messageHandler.getParameterTypes()[0], - messageHandler, listener, message)); - } - catch (InvocationTargetException e) { - MBassador.this.handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Message handler threw exception", - messageHandler, listener, message)); - } - catch (Throwable e) { - MBassador.this.handlePublicationError( - new PublicationError(e, "Error during messageHandler notification. " + - "Unexpected exception", - messageHandler, listener, message)); - } - } - - - public void unsubscribe(Object existingListener) { - listeners.remove(existingListener); - } - - - - - } - - private abstract class UnfilteredSubscription extends Subscription{ - - - private UnfilteredSubscription(Method messageHandler) { - super(messageHandler); - } - - public void publish(Object message) { - - Iterator iterator = listeners.iterator(); - Object listener = null; - while ((listener = iterator.next()) != null) { - dispatch(message, listener); - } - } - } - - private class UnfilteredAsynchronousSubscription extends UnfilteredSubscription{ - - - private UnfilteredAsynchronousSubscription(Method messageHandler) { - super(messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - MBassador.this.executor.execute(new Runnable() { - @Override - public void run() { - invokeHandler(message, listener); - } - }); - - } - } - - private class UnfilteredSynchronousSubscription extends UnfilteredSubscription{ - - - private UnfilteredSynchronousSubscription(Method messageHandler) { - super(messageHandler); - } - - protected void dispatch(final Object message, final Object listener){ - invokeHandler(message, listener); - } - } - - private abstract class FilteredSubscription extends Subscription{ - - private final MessageFilter[] filter; - - - private FilteredSubscription(Method messageHandler, MessageFilter[] filter) { - super(messageHandler); - this.filter = filter; - } - - private boolean passesFilter(Object message, Object listener) { - - if (filter == null) { - return true; - } - else { - for (int i = 0; i < filter.length; i++) { - if (!filter[i].accepts(message, listener)) return false; - } - return true; - } - } - - protected void publish(Object message) { - - Iterator iterator = listeners.iterator(); - Object listener = null; - while ((listener = iterator.next()) != null) { - if(passesFilter(message, listener)) { - dispatch(message, listener); - } - } - } - } - - private class FilteredSynchronousSubscription extends FilteredSubscription{ - - - private FilteredSynchronousSubscription(Method messageHandler, MessageFilter[] filter) { - super(messageHandler, filter); - } - - protected void dispatch(final Object message, final Object listener){ - MBassador.this.executor.execute(new Runnable() { - @Override - public void run() { - invokeHandler(message, listener); - } - }); - - } - } - - private class FilteredAsynchronousSubscription extends FilteredSubscription{ - - - private FilteredAsynchronousSubscription(Method messageHandler, MessageFilter[] filter) { - super(messageHandler, filter); - } - - protected void dispatch(final Object message, final Object listener){ - invokeHandler(message, listener); - } - } - - - private final class SubscriptionByPriorityDesc implements Comparator { - @Override - public int compare(Subscription o1, Subscription o2) { - return o1.getPriority() - o2.getPriority(); - } - }; - } diff --git a/src/main/java/org/mbassy/common/PublicationError.java b/src/main/java/org/mbassy/PublicationError.java similarity index 98% rename from src/main/java/org/mbassy/common/PublicationError.java rename to src/main/java/org/mbassy/PublicationError.java index e117571..844e54a 100644 --- a/src/main/java/org/mbassy/common/PublicationError.java +++ b/src/main/java/org/mbassy/PublicationError.java @@ -1,4 +1,4 @@ -package org.mbassy.common; +package org.mbassy; import java.lang.reflect.Method; diff --git a/src/main/java/org/mbassy/filter/Filter.java b/src/main/java/org/mbassy/listener/Filter.java similarity index 95% rename from src/main/java/org/mbassy/filter/Filter.java rename to src/main/java/org/mbassy/listener/Filter.java index 6da4e42..370dac1 100644 --- a/src/main/java/org/mbassy/filter/Filter.java +++ b/src/main/java/org/mbassy/listener/Filter.java @@ -1,4 +1,4 @@ -package org.mbassy.filter; +package org.mbassy.listener; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/src/main/java/org/mbassy/Listener.java b/src/main/java/org/mbassy/listener/Listener.java similarity index 53% rename from src/main/java/org/mbassy/Listener.java rename to src/main/java/org/mbassy/listener/Listener.java index b5f2985..dbaefae 100644 --- a/src/main/java/org/mbassy/Listener.java +++ b/src/main/java/org/mbassy/listener/Listener.java @@ -1,6 +1,4 @@ -package org.mbassy; - -import org.mbassy.filter.Filter; +package org.mbassy.listener; import java.lang.annotation.*; @@ -16,12 +14,10 @@ import java.lang.annotation.*; @Target(value = {ElementType.METHOD}) public @interface Listener { - Filter[] value() default {}; // no filters by default + Filter[] filters() default {}; // no filters by default - Dispatch mode() default Dispatch.Synchronous; + Mode dispatch() default Mode.Synchronous; - public static enum Dispatch{ - Synchronous,Asynchronous - } + int priority() default 0; } diff --git a/src/main/java/org/mbassy/filter/MessageFilter.java b/src/main/java/org/mbassy/listener/MessageFilter.java similarity index 97% rename from src/main/java/org/mbassy/filter/MessageFilter.java rename to src/main/java/org/mbassy/listener/MessageFilter.java index 8cc1e8d..42ad428 100644 --- a/src/main/java/org/mbassy/filter/MessageFilter.java +++ b/src/main/java/org/mbassy/listener/MessageFilter.java @@ -1,4 +1,4 @@ -package org.mbassy.filter; +package org.mbassy.listener; /** * Object filters can be used to prevent certain messages to be delivered to a specific listener. diff --git a/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java b/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java new file mode 100644 index 0000000..55b8a3e --- /dev/null +++ b/src/main/java/org/mbassy/listener/MessageHandlerMetadata.java @@ -0,0 +1,51 @@ +package org.mbassy.listener; + +import org.mbassy.listener.Listener; +import org.mbassy.listener.Mode; +import org.mbassy.listener.MessageFilter; + +import java.lang.reflect.Method; + +/** + * User: benni + * Date: 11/14/12 + */ +public class MessageHandlerMetadata { + + private Method handler; + + private MessageFilter[] filter; + + private Listener listenerConfig; + + private boolean isAsynchronous = false; + + + public MessageHandlerMetadata(Method handler, MessageFilter[] filter, Listener listenerConfig) { + this.handler = handler; + this.filter = filter; + this.listenerConfig = listenerConfig; + this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous); + } + + + public boolean isAsynchronous(){ + return isAsynchronous; + } + + public boolean isFiltered(){ + return filter == null || filter.length == 0; + } + + public int getPriority(){ + return listenerConfig.priority(); + } + + public Method getHandler() { + return handler; + } + + public MessageFilter[] getFilter() { + return filter; + } +} diff --git a/src/main/java/org/mbassy/listener/MetadataReader.java b/src/main/java/org/mbassy/listener/MetadataReader.java new file mode 100644 index 0000000..b5e239b --- /dev/null +++ b/src/main/java/org/mbassy/listener/MetadataReader.java @@ -0,0 +1,43 @@ +package org.mbassy.listener; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * Created with IntelliJ IDEA. + * User: benni + * Date: 11/16/12 + * Time: 10:22 AM + * To change this template use File | Settings | File Templates. + */ +public class MetadataReader { + + // cache already created filter instances + private final Map, MessageFilter> filterCache = new HashMap, MessageFilter>(); + + // retrieve all instances of filters associated with the given subscription + private MessageFilter[] getFilter(Listener subscription) throws Exception{ + if (subscription.filters().length == 0) return null; + MessageFilter[] filters = new MessageFilter[subscription.filters().length]; + int i = 0; + for (Filter filterDef : subscription.filters()) { + MessageFilter filter = filterCache.get(filterDef.value()); + if (filter == null) { + filter = filterDef.value().newInstance(); + filterCache.put(filterDef.value(), filter); + + } + filters[i] = filter; + i++; + } + return filters; + } + + + public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) throws Exception{ + Listener config = messageHandler.getAnnotation(Listener.class); + MessageFilter[] filter = getFilter(config); + return new MessageHandlerMetadata(messageHandler, filter, config); + } +} diff --git a/src/main/java/org/mbassy/listener/Mode.java b/src/main/java/org/mbassy/listener/Mode.java new file mode 100644 index 0000000..4a8095d --- /dev/null +++ b/src/main/java/org/mbassy/listener/Mode.java @@ -0,0 +1,12 @@ +package org.mbassy.listener; + +/** +* Created with IntelliJ IDEA. +* User: benni +* Date: 11/16/12 +* Time: 10:01 AM +* To change this template use File | Settings | File Templates. +*/ +public enum Mode { + Synchronous,Asynchronous +} diff --git a/src/test/java/org/mbassy/MBassadorTest.java b/src/test/java/org/mbassy/MBassadorTest.java index 7ef79ae..2c096ea 100644 --- a/src/test/java/org/mbassy/MBassadorTest.java +++ b/src/test/java/org/mbassy/MBassadorTest.java @@ -2,8 +2,10 @@ package org.mbassy; import org.junit.Assert; import org.junit.Test; -import org.mbassy.filter.Filter; -import org.mbassy.filter.MessageFilter; +import org.mbassy.listener.Filter; +import org.mbassy.listener.Listener; +import org.mbassy.listener.MessageFilter; +import org.mbassy.listener.Mode; import java.util.ArrayList; import java.util.List; @@ -187,14 +189,17 @@ public class MBassadorTest { } // this handler will be invoked asynchronously - @Listener(mode = Listener.Dispatch.Asynchronous) + @Listener(priority = 0, dispatch = Mode.Asynchronous) public void handleSubTestEvent(SubTestEvent event) { event.counter.incrementAndGet(); } // this handler will receive events of type SubTestEvent // or any subtabe and that passes the given filter - @Listener({@Filter(MessageFilter.None.class),@Filter(MessageFilter.All.class)}) + @Listener( + priority = 10, + dispatch = Mode.Synchronous, + filters = {@Filter(MessageFilter.None.class),@Filter(MessageFilter.All.class)}) public void handleFiltered(SubTestEvent event) { event.counter.incrementAndGet(); }