fixed priority handling, extended respective unit tests
This commit is contained in:
parent
0d36eb65c7
commit
d2175fa215
@ -10,7 +10,7 @@ import java.util.UUID;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A subscription is a thread-safe container that manages exactly one message handler of all registered
|
* A subscription is a thread-safe container that manages exactly one message handler of all registered
|
||||||
* message listeners of the same class, i.e. all subscribed instances of a SingleMessageHandler.class
|
* message listeners of the same class, i.e. all subscribed instances (exlcuding subclasses) of a SingleMessageHandler.class
|
||||||
* will be referenced in the subscription created for SingleMessageHandler.class.
|
* will be referenced in the subscription created for SingleMessageHandler.class.
|
||||||
*
|
*
|
||||||
* There will be as many unique subscription objects per message listener class as there are message handlers
|
* There will be as many unique subscription objects per message listener class as there are message handlers
|
||||||
@ -96,8 +96,8 @@ public class Subscription {
|
|||||||
public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
|
public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(Subscription o1, Subscription o2) {
|
public int compare(Subscription o1, Subscription o2) {
|
||||||
int byPriority = ((Integer)o1.getPriority()).compareTo(o2.getPriority());
|
int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());
|
||||||
return byPriority == 0 ? o1.id.compareTo(o2.id) : byPriority;
|
return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package net.engio.mbassy;
|
|||||||
|
|
||||||
import net.engio.mbassy.bus.BusRuntime;
|
import net.engio.mbassy.bus.BusRuntime;
|
||||||
import net.engio.mbassy.common.*;
|
import net.engio.mbassy.common.*;
|
||||||
|
import net.engio.mbassy.listener.Handler;
|
||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
import net.engio.mbassy.listeners.*;
|
import net.engio.mbassy.listeners.*;
|
||||||
import net.engio.mbassy.messages.*;
|
import net.engio.mbassy.messages.*;
|
||||||
@ -198,6 +199,19 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||||||
runTestWith(listeners, expectedSubscriptions);
|
runTestWith(listeners, expectedSubscriptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrioritizedMessageHandlers(){
|
||||||
|
ListenerFactory listeners = listeners(PrioritizedListener.class);
|
||||||
|
|
||||||
|
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory(), mockedRuntime());
|
||||||
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
|
||||||
|
|
||||||
|
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
|
||||||
|
.listener(PrioritizedListener.class).handles(IMessage.class, IMessage.class, IMessage.class, IMessage.class);
|
||||||
|
|
||||||
|
runTestWith(listeners, expectedSubscriptions);
|
||||||
|
}
|
||||||
|
|
||||||
private BusRuntime mockedRuntime(){
|
private BusRuntime mockedRuntime(){
|
||||||
return new BusRuntime(null)
|
return new BusRuntime(null)
|
||||||
.add("error.handlers", Collections.EMPTY_SET)
|
.add("error.handlers", Collections.EMPTY_SET)
|
||||||
@ -230,5 +244,29 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static class PrioritizedListener{
|
||||||
|
|
||||||
|
|
||||||
|
@Handler(priority = 1)
|
||||||
|
public void handlePrio1(IMessage message){
|
||||||
|
message.handled(this.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 2)
|
||||||
|
public void handlePrio2(IMessage message){
|
||||||
|
message.handled(this.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 3)
|
||||||
|
public void handlePrio3(IMessage message){
|
||||||
|
message.handled(this.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Handler(priority = 4)
|
||||||
|
public void handlePrio4(IMessage message){
|
||||||
|
message.handled(this.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
public void validate(SubscriptionManager manager){
|
public void validate(SubscriptionManager manager){
|
||||||
for(Class messageType : messageTypes){
|
for(Class messageType : messageTypes){
|
||||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
|
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
|
||||||
|
ensureOrdering(subscriptions);
|
||||||
Collection<ValidationEntry> validationEntries = getEntries(EntriesByMessageType(messageType));
|
Collection<ValidationEntry> validationEntries = getEntries(EntriesByMessageType(messageType));
|
||||||
assertEquals(subscriptions.size(), validationEntries.size());
|
assertEquals(subscriptions.size(), validationEntries.size());
|
||||||
for(ValidationEntry validationValidationEntry : validationEntries){
|
for(ValidationEntry validationValidationEntry : validationEntries){
|
||||||
@ -53,6 +54,14 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void ensureOrdering(Collection<Subscription> subscriptions){
|
||||||
|
int lastPriority = Integer.MAX_VALUE;// highest priority possible
|
||||||
|
for(Subscription sub : subscriptions){
|
||||||
|
assertTrue("Subscriptions should be ordered by priority (DESC)", lastPriority >= sub.getPriority());
|
||||||
|
lastPriority = sub.getPriority();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private Collection<ValidationEntry> getEntries(IPredicate<ValidationEntry> filter){
|
private Collection<ValidationEntry> getEntries(IPredicate<ValidationEntry> filter){
|
||||||
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
|
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
|
||||||
|
@ -29,7 +29,7 @@ public class AbstractMessageListener {
|
|||||||
|
|
||||||
public static class NoSubtypesListener extends BaseListener {
|
public static class NoSubtypesListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(rejectSubtypes = true)
|
@Handler(rejectSubtypes = true, priority = 4)
|
||||||
public void handle(AbstractMessage message){
|
public void handle(AbstractMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
@ -38,7 +38,7 @@ public class AbstractMessageListener {
|
|||||||
|
|
||||||
public static class AsyncListener extends BaseListener {
|
public static class AsyncListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(delivery = Invoke.Asynchronously)
|
@Handler(delivery = Invoke.Asynchronously, priority = Integer.MAX_VALUE)
|
||||||
public void handle(AbstractMessage message){
|
public void handle(AbstractMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ public class IMultipartMessageListener {
|
|||||||
|
|
||||||
public static class NoSubtypesListener extends BaseListener {
|
public static class NoSubtypesListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(rejectSubtypes = true)
|
@Handler(rejectSubtypes = true, priority = Integer.MIN_VALUE)
|
||||||
public void handle(IMultipartMessage message){
|
public void handle(IMultipartMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
@ -38,7 +38,7 @@ public class IMultipartMessageListener {
|
|||||||
|
|
||||||
public static class AsyncListener extends BaseListener {
|
public static class AsyncListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(delivery = Invoke.Asynchronously)
|
@Handler(delivery = Invoke.Asynchronously, priority = Integer.MIN_VALUE)
|
||||||
public void handle(IMultipartMessage message){
|
public void handle(IMultipartMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
@ -47,7 +47,7 @@ public class IMultipartMessageListener {
|
|||||||
|
|
||||||
public static class DisabledListener extends BaseListener {
|
public static class DisabledListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(enabled = false)
|
@Handler(enabled = false , priority = 4)
|
||||||
public void handle(IMultipartMessage message){
|
public void handle(IMultipartMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.engio.mbassy.listeners;
|
package net.engio.mbassy.listeners;
|
||||||
|
|
||||||
import net.engio.mbassy.common.DeadMessage;
|
|
||||||
import net.engio.mbassy.listener.Handler;
|
import net.engio.mbassy.listener.Handler;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -12,7 +11,7 @@ public class ObjectListener {
|
|||||||
|
|
||||||
private List handledMessages = Collections.synchronizedList(new LinkedList());
|
private List handledMessages = Collections.synchronizedList(new LinkedList());
|
||||||
|
|
||||||
@Handler
|
@Handler(priority = Integer.MAX_VALUE)
|
||||||
public void handle(Object message){
|
public void handle(Object message){
|
||||||
handledMessages.add(message);
|
handledMessages.add(message);
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ public class StandardMessageListener {
|
|||||||
|
|
||||||
private static abstract class BaseListener {
|
private static abstract class BaseListener {
|
||||||
|
|
||||||
@Handler
|
@Handler(priority = 3)
|
||||||
public void handle(StandardMessage message){
|
public void handle(StandardMessage message){
|
||||||
message.handled(this.getClass());
|
message.handled(this.getClass());
|
||||||
}
|
}
|
||||||
@ -29,7 +29,7 @@ public class StandardMessageListener {
|
|||||||
|
|
||||||
public static class NoSubtypesListener extends BaseListener {
|
public static class NoSubtypesListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(rejectSubtypes = true)
|
@Handler(rejectSubtypes = true, priority = 4)
|
||||||
public void handle(StandardMessage message){
|
public void handle(StandardMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
@ -38,7 +38,7 @@ public class StandardMessageListener {
|
|||||||
|
|
||||||
public static class AsyncListener extends BaseListener {
|
public static class AsyncListener extends BaseListener {
|
||||||
|
|
||||||
@Handler(delivery = Invoke.Asynchronously)
|
@Handler(delivery = Invoke.Asynchronously, priority = -10)
|
||||||
public void handle(StandardMessage message){
|
public void handle(StandardMessage message){
|
||||||
super.handle(message);
|
super.handle(message);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user