diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index 71e378a..544d302 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -10,7 +10,7 @@ import java.util.UUID; /** * A subscription is a thread-safe container that manages exactly one message handler of all registered - * message listeners of the same class, i.e. all subscribed instances of a SingleMessageHandler.class + * message listeners of the same class, i.e. all subscribed instances (exlcuding subclasses) of a SingleMessageHandler.class * will be referenced in the subscription created for SingleMessageHandler.class. * * There will be as many unique subscription objects per message listener class as there are message handlers @@ -96,8 +96,8 @@ public class Subscription { public static final Comparator SubscriptionByPriorityDesc = new Comparator() { @Override public int compare(Subscription o1, Subscription o2) { - int byPriority = ((Integer)o1.getPriority()).compareTo(o2.getPriority()); - return byPriority == 0 ? o1.id.compareTo(o2.id) : byPriority; + int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority()); + return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority; } }; diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java index cb95e60..159138c 100644 --- a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -2,6 +2,7 @@ package net.engio.mbassy; import net.engio.mbassy.bus.BusRuntime; import net.engio.mbassy.common.*; +import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listeners.*; import net.engio.mbassy.messages.*; @@ -198,6 +199,19 @@ public class SubscriptionManagerTest extends AssertSupport { runTestWith(listeners, expectedSubscriptions); } + @Test + public void testPrioritizedMessageHandlers(){ + ListenerFactory listeners = listeners(PrioritizedListener.class); + + SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime()); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(PrioritizedListener.class).handles(IMessage.class, IMessage.class, IMessage.class, IMessage.class); + + runTestWith(listeners, expectedSubscriptions); + } + private BusRuntime mockedRuntime(){ return new BusRuntime(null) .add("error.handlers", Collections.EMPTY_SET) @@ -230,5 +244,29 @@ public class SubscriptionManagerTest extends AssertSupport { + public static class PrioritizedListener{ + + + @Handler(priority = 1) + public void handlePrio1(IMessage message){ + message.handled(this.getClass()); + } + + @Handler(priority = 2) + public void handlePrio2(IMessage message){ + message.handled(this.getClass()); + } + + @Handler(priority = 3) + public void handlePrio3(IMessage message){ + message.handled(this.getClass()); + } + + @Handler(priority = 4) + public void handlePrio4(IMessage message){ + message.handled(this.getClass()); + } + } + } diff --git a/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java index d967cbb..20d9e6f 100644 --- a/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java +++ b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java @@ -36,6 +36,7 @@ public class SubscriptionValidator extends AssertSupport{ public void validate(SubscriptionManager manager){ for(Class messageType : messageTypes){ Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); + ensureOrdering(subscriptions); Collection validationEntries = getEntries(EntriesByMessageType(messageType)); assertEquals(subscriptions.size(), validationEntries.size()); for(ValidationEntry validationValidationEntry : validationEntries){ @@ -53,6 +54,14 @@ public class SubscriptionValidator extends AssertSupport{ } } + private void ensureOrdering(Collection subscriptions){ + int lastPriority = Integer.MAX_VALUE;// highest priority possible + for(Subscription sub : subscriptions){ + assertTrue("Subscriptions should be ordered by priority (DESC)", lastPriority >= sub.getPriority()); + lastPriority = sub.getPriority(); + } + } + private Collection getEntries(IPredicate filter){ Collection matching = new LinkedList(); diff --git a/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java index 33823d5..94e23ea 100644 --- a/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java @@ -29,7 +29,7 @@ public class AbstractMessageListener { public static class NoSubtypesListener extends BaseListener { - @Handler(rejectSubtypes = true) + @Handler(rejectSubtypes = true, priority = 4) public void handle(AbstractMessage message){ super.handle(message); } @@ -38,7 +38,7 @@ public class AbstractMessageListener { public static class AsyncListener extends BaseListener { - @Handler(delivery = Invoke.Asynchronously) + @Handler(delivery = Invoke.Asynchronously, priority = Integer.MAX_VALUE) public void handle(AbstractMessage message){ super.handle(message); } diff --git a/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java index d728ddb..9fc108f 100644 --- a/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java @@ -29,7 +29,7 @@ public class IMultipartMessageListener { public static class NoSubtypesListener extends BaseListener { - @Handler(rejectSubtypes = true) + @Handler(rejectSubtypes = true, priority = Integer.MIN_VALUE) public void handle(IMultipartMessage message){ super.handle(message); } @@ -38,7 +38,7 @@ public class IMultipartMessageListener { public static class AsyncListener extends BaseListener { - @Handler(delivery = Invoke.Asynchronously) + @Handler(delivery = Invoke.Asynchronously, priority = Integer.MIN_VALUE) public void handle(IMultipartMessage message){ super.handle(message); } @@ -47,7 +47,7 @@ public class IMultipartMessageListener { public static class DisabledListener extends BaseListener { - @Handler(enabled = false) + @Handler(enabled = false , priority = 4) public void handle(IMultipartMessage message){ super.handle(message); } diff --git a/src/test/java/net/engio/mbassy/listeners/ObjectListener.java b/src/test/java/net/engio/mbassy/listeners/ObjectListener.java index a9b401b..58923cc 100644 --- a/src/test/java/net/engio/mbassy/listeners/ObjectListener.java +++ b/src/test/java/net/engio/mbassy/listeners/ObjectListener.java @@ -1,6 +1,5 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.listener.Handler; import java.util.Collections; @@ -12,7 +11,7 @@ public class ObjectListener { private List handledMessages = Collections.synchronizedList(new LinkedList()); - @Handler + @Handler(priority = Integer.MAX_VALUE) public void handle(Object message){ handledMessages.add(message); } diff --git a/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java index 41f8f1a..77a94ed 100644 --- a/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java +++ b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java @@ -13,7 +13,7 @@ public class StandardMessageListener { private static abstract class BaseListener { - @Handler + @Handler(priority = 3) public void handle(StandardMessage message){ message.handled(this.getClass()); } @@ -29,7 +29,7 @@ public class StandardMessageListener { public static class NoSubtypesListener extends BaseListener { - @Handler(rejectSubtypes = true) + @Handler(rejectSubtypes = true, priority = 4) public void handle(StandardMessage message){ super.handle(message); } @@ -38,7 +38,7 @@ public class StandardMessageListener { public static class AsyncListener extends BaseListener { - @Handler(delivery = Invoke.Asynchronously) + @Handler(delivery = Invoke.Asynchronously, priority = -10) public void handle(StandardMessage message){ super.handle(message); }