diff --git a/src/main/java/org/mbassy/subscription/Subscription.java b/src/main/java/org/mbassy/subscription/Subscription.java new file mode 100644 index 0000000..6df9959 --- /dev/null +++ b/src/main/java/org/mbassy/subscription/Subscription.java @@ -0,0 +1,104 @@ +package org.mbassy.subscription; + +import org.mbassy.IMessageBus; +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.common.ConcurrentSet; +import org.mbassy.PublicationError; +import org.mbassy.listener.MessageHandlerMetadata; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Comparator; + +/** + * Subscription is a thread safe container for objects that contain message handlers + */ +public abstract class Subscription { + + private final Method handler; + + protected ConcurrentSet listeners = new ConcurrentSet(); + + private int priority = 0; + + private IMessageBus owningBus ; + + public Subscription(IMessageBus owningBus, MessageHandlerMetadata messageHandler) { + this.owningBus = owningBus; + this.priority = messageHandler.getPriority(); + this.handler = messageHandler.getHandler(); + this.handler.setAccessible(true); + } + + public abstract void publish(Object message); + + protected abstract void dispatch(final Object message, final Object listener); + + + protected IMessageBus getMessageBus(){ + return owningBus; + } + + public int getPriority(){ + return priority; + } + + + public void subscribe(Object o) { + listeners.add(o); + + } + + protected void handlePublicationError(PublicationError error){ + Collection handlers = owningBus.getRegisteredErrorHandlers(); + for(IPublicationErrorHandler handler : handlers){ + handler.handleError(error); + } + } + + protected void invokeHandler(final Object message, final Object listener){ + try { + handler.invoke(listener, message); + }catch(IllegalAccessException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "The class or method is not accessible", + handler, listener, message)); + } + catch(IllegalArgumentException e){ + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0], + handler, listener, message)); + } + catch (InvocationTargetException e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Message handler threw exception", + handler, listener, message)); + } + catch (Throwable e) { + handlePublicationError( + new PublicationError(e, "Error during messageHandler notification. " + + "Unexpected exception", + handler, listener, message)); + } + } + + + public void unsubscribe(Object existingListener) { + listeners.remove(existingListener); + } + + + public static final Comparator SubscriptionByPriorityDesc = new Comparator() { + @Override + public int compare(Subscription o1, Subscription o2) { + int result = o1.getPriority() - o2.getPriority(); + return result == 0 ? o1.handler.hashCode() - o2.handler.hashCode() : result; + } + }; + +} diff --git a/src/main/java/org/mbassy/subscription/SubscriptionDeliveryRequest.java b/src/main/java/org/mbassy/subscription/SubscriptionDeliveryRequest.java new file mode 100644 index 0000000..e280608 --- /dev/null +++ b/src/main/java/org/mbassy/subscription/SubscriptionDeliveryRequest.java @@ -0,0 +1,38 @@ +package org.mbassy.subscription; + +import java.util.Collection; +import java.util.LinkedList; + +/** + * User: benni + * Date: 11/16/12 + */ +public class SubscriptionDeliveryRequest { + + private Collection subscriptions; + + private T message; + + public SubscriptionDeliveryRequest(Collection subscriptions, T message) { + this.subscriptions = subscriptions; + this.message = message; + } + + public SubscriptionDeliveryRequest(T message){ + this.message = message; + subscriptions = new LinkedList(); + } + + public boolean addAll(Collection c) { + return subscriptions.addAll(c); + } + + public boolean add(Subscription subscription) { + return subscriptions.add(subscription); + } + + public void execute(){ + for(Subscription sub : subscriptions) + sub.publish(message); + } +} diff --git a/src/main/java/org/mbassy/subscription/SubscriptionFactory.java b/src/main/java/org/mbassy/subscription/SubscriptionFactory.java new file mode 100644 index 0000000..5f3e57e --- /dev/null +++ b/src/main/java/org/mbassy/subscription/SubscriptionFactory.java @@ -0,0 +1,43 @@ +package org.mbassy.subscription; + +import org.mbassy.IMessageBus; +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.listener.MessageHandlerMetadata; + +import java.util.Collection; + +/** + * Created with IntelliJ IDEA. + * User: benni + * Date: 11/16/12 + * Time: 10:39 AM + * To change this template use File | Settings | File Templates. + */ +public class SubscriptionFactory { + + private IMessageBus owner; + + + public SubscriptionFactory(IMessageBus owner) { + this.owner = owner; + } + + public Subscription createSubscription(MessageHandlerMetadata messageHandlerMetadata){ + if(messageHandlerMetadata.isFiltered()){ + if(messageHandlerMetadata.isAsynchronous()){ + return new UnfilteredAsynchronousSubscription(owner, messageHandlerMetadata); + } + else{ + return new UnfilteredSynchronousSubscription(owner, messageHandlerMetadata); + } + } + else{ + if(messageHandlerMetadata.isAsynchronous()){ + return new FilteredAsynchronousSubscription(owner, messageHandlerMetadata); + } + else{ + return new FilteredSynchronousSubscription(owner, messageHandlerMetadata); + } + } + } +} diff --git a/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java new file mode 100644 index 0000000..89bd340 --- /dev/null +++ b/src/main/java/org/mbassy/subscription/UnfilteredAsynchronousSubscription.java @@ -0,0 +1,33 @@ +package org.mbassy.subscription; + +import org.mbassy.IMessageBus; +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.MBassador; +import org.mbassy.listener.MessageHandlerMetadata; + +import java.lang.reflect.Method; +import java.util.Collection; + +/** +* Created with IntelliJ IDEA. +* User: benni +* Date: 11/14/12 +* Time: 3:48 PM +* To change this template use File | Settings | File Templates. +*/ +public class UnfilteredAsynchronousSubscription extends UnfilteredSubscription { + + public UnfilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { + super(mBassador, messageHandler); + } + + protected void dispatch(final Object message, final Object listener){ + getMessageBus().getExecutor().execute(new Runnable() { + @Override + public void run() { + invokeHandler(message, listener); + } + }); + + } +} diff --git a/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java new file mode 100644 index 0000000..8e40be9 --- /dev/null +++ b/src/main/java/org/mbassy/subscription/UnfilteredSubscription.java @@ -0,0 +1,34 @@ +package org.mbassy.subscription; + +import org.mbassy.IMessageBus; +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.MBassador; +import org.mbassy.listener.MessageHandlerMetadata; + +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Iterator; + +/** +* Created with IntelliJ IDEA. +* User: benni +* Date: 11/14/12 +* Time: 3:45 PM +* To change this template use File | Settings | File Templates. +*/ +public abstract class UnfilteredSubscription extends Subscription{ + + + public UnfilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { + super(mBassador, messageHandler); + } + + public void publish(Object message) { + + Iterator iterator = listeners.iterator(); + Object listener = null; + while ((listener = iterator.next()) != null) { + dispatch(message, listener); + } + } +} diff --git a/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java b/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java new file mode 100644 index 0000000..5689602 --- /dev/null +++ b/src/main/java/org/mbassy/subscription/UnfilteredSynchronousSubscription.java @@ -0,0 +1,27 @@ +package org.mbassy.subscription; + +import org.mbassy.IMessageBus; +import org.mbassy.IPublicationErrorHandler; +import org.mbassy.MBassador; +import org.mbassy.listener.MessageHandlerMetadata; + +import java.lang.reflect.Method; +import java.util.Collection; + +/** +* Created with IntelliJ IDEA. +* User: benni +* Date: 11/14/12 +* Time: 3:49 PM +* To change this template use File | Settings | File Templates. +*/ +public class UnfilteredSynchronousSubscription extends UnfilteredSubscription{ + + public UnfilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) { + super(mBassador, messageHandler); + } + + protected void dispatch(final Object message, final Object listener){ + invokeHandler(message, listener); + } +}