Wip - getting var-args for messages/subscription working
This commit is contained in:
parent
d46a773286
commit
0202c937ce
|
@ -55,7 +55,7 @@ import net.engio.mbassy.bus.error.ErrorHandlingSupport;
|
|||
* @Author bennidi
|
||||
* Date: 2/8/12
|
||||
*/
|
||||
public interface IMessageBus<T> extends PubSubSupport<T>, ErrorHandlingSupport {
|
||||
public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
|
||||
|
||||
/**
|
||||
* Check whether any asynchronous message publications are pending to be processed
|
||||
|
|
|
@ -15,7 +15,7 @@ import net.engio.mbassy.bus.error.PublicationError;
|
|||
/**
|
||||
* The base class for all message bus implementations with support for asynchronous message dispatch
|
||||
*/
|
||||
public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
||||
public class MBassador extends AbstractPubSubSupport implements IMessageBus {
|
||||
|
||||
private final int numberOfMessageDispatchers;
|
||||
|
||||
|
@ -23,7 +23,7 @@ public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBu
|
|||
private final List<Thread> dispatchers;
|
||||
|
||||
// all pending messages scheduled for asynchronous dispatch are queued here
|
||||
private final BlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>(Integer.MAX_VALUE / 16);
|
||||
private final BlockingQueue<Object> pendingMessages = new LinkedBlockingQueue<Object>(Integer.MAX_VALUE / 16);
|
||||
|
||||
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBu
|
|||
Thread dispatcher = MessageDispatchThreadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
T message = null;
|
||||
Object message = null;
|
||||
while (true) {
|
||||
try {
|
||||
message = MBassador.this.pendingMessages.take();
|
||||
|
@ -87,7 +87,7 @@ public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBu
|
|||
* @param message
|
||||
*/
|
||||
@Override
|
||||
public void publish(T message) {
|
||||
public void publish(Object message) {
|
||||
try {
|
||||
publishMessage(message);
|
||||
} catch (Throwable e) {
|
||||
|
@ -108,7 +108,7 @@ public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBu
|
|||
* @return A message publication that can be used to access information about it's state
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message) {
|
||||
public void publishAsync(Object message) {
|
||||
try {
|
||||
this.pendingMessages.put(message);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -127,7 +127,7 @@ public class MBassador<T> extends AbstractPubSubSupport<T> implements IMessageBu
|
|||
* @return A message publication that wraps up the publication request
|
||||
*/
|
||||
@Override
|
||||
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||
public void publishAsync(Object message, long timeout, TimeUnit unit) {
|
||||
try {
|
||||
this.pendingMessages.offer(message, timeout, unit);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
|
|||
* handlers will be registered and start to receive matching message publications.
|
||||
*
|
||||
*/
|
||||
public interface PubSubSupport<T> {
|
||||
public interface PubSubSupport {
|
||||
|
||||
/**
|
||||
* Subscribe all handlers of the given listener. Any listener is only subscribed once
|
||||
|
@ -39,7 +39,7 @@ public interface PubSubSupport<T> {
|
|||
*
|
||||
* @param message
|
||||
*/
|
||||
void publish(T message);
|
||||
void publish(Object message);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -49,7 +49,7 @@ public interface PubSubSupport<T> {
|
|||
* If an unbound queuing strategy is used the call returns immediately.
|
||||
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||
*/
|
||||
void publishAsync(T message);
|
||||
void publishAsync(Object message);
|
||||
|
||||
/**
|
||||
* Execute the message publication asynchronously. The behavior of this method depends on the
|
||||
|
@ -59,5 +59,5 @@ public interface PubSubSupport<T> {
|
|||
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||
* or the timeout is reached.
|
||||
*/
|
||||
void publishAsync(T message, long timeout, TimeUnit unit);
|
||||
void publishAsync(Object message, long timeout, TimeUnit unit);
|
||||
}
|
||||
|
|
|
@ -14,10 +14,8 @@ import net.engio.mbassy.subscription.SubscriptionManager;
|
|||
|
||||
/**
|
||||
* The base class for all message bus implementations.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, ErrorHandlingSupport {
|
||||
public abstract class AbstractPubSubSupport implements PubSubSupport, ErrorHandlingSupport {
|
||||
|
||||
// error handling is first-class functionality
|
||||
// this handler will receive all errors that occur during message dispatch or message handling
|
||||
|
@ -54,9 +52,11 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, Erro
|
|||
}
|
||||
}
|
||||
|
||||
protected void publishMessage(T message) {
|
||||
Class<? extends Object> class1 = message.getClass();
|
||||
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(class1);
|
||||
protected void publishMessage(Object message) {
|
||||
Class<? extends Object> messageClass = message.getClass();
|
||||
|
||||
// TODO: convert this to have N number of message types
|
||||
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(messageClass);
|
||||
|
||||
if (subscriptions == null || subscriptions.isEmpty()) {
|
||||
// Dead Event
|
||||
|
@ -78,7 +78,7 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, Erro
|
|||
|
||||
// if the message did not have any listener/handler accept it
|
||||
if (!success) {
|
||||
if (!isDeadEvent(message)) {
|
||||
if (!DeadMessage.class.equals(messageClass.getClass())) {
|
||||
// Dead Event
|
||||
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
||||
DeadMessage deadMessage = new DeadMessage(message);
|
||||
|
@ -91,10 +91,8 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, Erro
|
|||
}
|
||||
}
|
||||
|
||||
private final boolean isDeadEvent(Object message) {
|
||||
return DeadMessage.class.equals(message.getClass());
|
||||
}
|
||||
|
||||
// TODO: convert this to have N number of message types
|
||||
|
||||
// obtain the set of subscriptions for the given message type
|
||||
// Note: never returns null!
|
||||
|
|
|
@ -61,18 +61,370 @@ public class MessageHandler {
|
|||
return this.handledMessages;
|
||||
}
|
||||
|
||||
public boolean handlesMessage(Class<?> messageType) {
|
||||
for (Class<?> handledMessage : this.handledMessages) {
|
||||
if (handledMessage.equals(messageType)) {
|
||||
// todo: have 1, 2, 3 versions
|
||||
// /**
|
||||
// * @return true if the message types are handled
|
||||
// */
|
||||
// public boolean handlesMessage(Class<?> requiredMessageType, Class<?>... optionalMessageTypes) {
|
||||
// // have to check ALL of the handled messages
|
||||
//
|
||||
// Class<?>[] handledMessages = this.handledMessages;
|
||||
// // handle var-args
|
||||
// if (handledMessages.length == 1 && handledMessages[0].isArray()) {
|
||||
// Class<?> componentType = handledMessages[0].getComponentType();
|
||||
//
|
||||
// // is requiredMessageType var-arg?
|
||||
// if ((optionalMessageTypes == null || optionalMessageTypes.length == 0) &&
|
||||
// requiredMessageType.isArray()) {
|
||||
//
|
||||
// // only var-arg -> var-arg
|
||||
// return requiredMessageType.getComponentType() == componentType;
|
||||
// }
|
||||
//
|
||||
// // otherwise, it's not a var-arg (but it still might be an array!)
|
||||
//
|
||||
// // fast exit
|
||||
// if (requiredMessageType != componentType) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// // only using 1 arg
|
||||
// if (optionalMessageTypes == null || optionalMessageTypes.length == 0) {
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// // are the OPTIONAL arrays of same type??
|
||||
// if (optionalMessageTypes[0] != componentType) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// boolean ofSameType = true;
|
||||
// Class<?> typeCheck = optionalMessageTypes[0];
|
||||
// for (int i = 1; i < optionalMessageTypes.length; i++) {
|
||||
// Class<?> t1 = optionalMessageTypes[i];
|
||||
// if (t1 != typeCheck) {
|
||||
// ofSameType = false;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ofSameType && typeCheck == componentType;
|
||||
// } else {
|
||||
// // is requiredMessageType var-arg?
|
||||
// if ((optionalMessageTypes == null || optionalMessageTypes.length == 0) &&
|
||||
// requiredMessageType.isArray()) {
|
||||
//
|
||||
// // only var-arg -> var-arg (handler var-arg is first check)
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// // otherwise, it's not a var-arg (but it still might be an array!)
|
||||
//
|
||||
//
|
||||
// // fast exit
|
||||
// if (requiredMessageType != this.handledMessages[0]) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// int length = this.handledMessages.length;
|
||||
// // is arg2 var-arg?
|
||||
// if (length == 2 && this.handledMessages[1].isArray()) {
|
||||
// Class<?> componentType = handledMessages[1].getComponentType();
|
||||
//
|
||||
// // are they BOTH arrays of same type??
|
||||
// if (optionalMessageTypes.length == 1) {
|
||||
// return optionalMessageTypes[0] == componentType;
|
||||
// }
|
||||
//
|
||||
// boolean ofSameType = true;
|
||||
// Class<?> typeCheck = optionalMessageTypes[0];
|
||||
// for (int i = 1; i < optionalMessageTypes.length; i++) {
|
||||
// Class<?> t1 = optionalMessageTypes[i];
|
||||
// if (t1 != typeCheck) {
|
||||
// ofSameType = false;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ofSameType && typeCheck == componentType;
|
||||
// }
|
||||
//
|
||||
// // fast exit (check arg2)
|
||||
// if (optionalMessageTypes.length + 1 != length) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// // slow check
|
||||
// if (this.acceptsSubtypes) {
|
||||
// for (int i = 0; i < optionalMessageTypes.length; i++) {
|
||||
// Class<?> messageType = optionalMessageTypes[i];
|
||||
// Class<?> handledMessage = this.handledMessages[i+1];
|
||||
//
|
||||
// if (!handledMessage.isAssignableFrom(messageType)) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return true;
|
||||
// } else {
|
||||
// for (int i = 0; i < optionalMessageTypes.length; i++) {
|
||||
// Class<?> messageType = optionalMessageTypes[i];
|
||||
// Class<?> handledMessage = this.handledMessages[i+1];
|
||||
//
|
||||
// if (handledMessage != messageType) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return true;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// /**
|
||||
// * @return true if the message types are handled
|
||||
// */
|
||||
// public boolean handlesMessage(Class<?>... messageTypes) {
|
||||
// // have to check ALL of the handled messages
|
||||
//
|
||||
// Class<?>[] handledMessages = this.handledMessages;
|
||||
// // handle var-args
|
||||
// int length = handledMessages.length;
|
||||
//
|
||||
// if (length == 1 && handledMessages[0].isArray()) {
|
||||
// Class<?> componentType = handledMessages[0].getComponentType();
|
||||
//
|
||||
// // are they BOTH arrays of same type??
|
||||
// if (messageTypes.length == 1) {
|
||||
// return messageTypes[0].getComponentType() == componentType;
|
||||
// }
|
||||
//
|
||||
// boolean ofSameType = true;
|
||||
// Class<?> typeCheck = messageTypes[0];
|
||||
// for (int i = 1; i < messageTypes.length; i++) {
|
||||
// Class<?> t1 = messageTypes[i];
|
||||
// if (t1 != typeCheck) {
|
||||
// ofSameType = false;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ofSameType && typeCheck == componentType;
|
||||
// } else {
|
||||
// // is the last handler a var-arg?
|
||||
// if (handledMessages[length-1].isArray()) {
|
||||
//
|
||||
// // handler var-arg starting position
|
||||
// int handlerVA_startsAt = length-1;
|
||||
//
|
||||
// // do they match up to the point var-args start?
|
||||
// if (messageTypes.length >= handlerVA_startsAt) {
|
||||
// if (this.acceptsSubtypes) {
|
||||
// for (int i = 1; i < length-1; i++) {
|
||||
// Class<?> messageType = messageTypes[i];
|
||||
// Class<?> handledMessage = handledMessages[i];
|
||||
//
|
||||
// if (!handledMessage.isAssignableFrom(messageType)) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// for (int i = 1; i < length-1; i++) {
|
||||
// Class<?> messageType = messageTypes[i];
|
||||
// Class<?> handledMessage = handledMessages[i];
|
||||
//
|
||||
// if (handledMessage != messageType) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // they matched so far, do we have something for the handler var-arg?
|
||||
// if (messageTypes.length == handlerVA_startsAt) {
|
||||
// // nothing for var-arg check
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// // need var-arg check
|
||||
//
|
||||
// // how many vars in the messageType need to be checked?
|
||||
//
|
||||
//
|
||||
// Class<?> varArgCheckType = handledMessages[length-1].getComponentType();
|
||||
//
|
||||
// // are they BOTH arrays of same type??
|
||||
// if (messageTypes.length messageTypes[handlerVA_startsAt].isArray()) {
|
||||
// return messageTypes[handlerVA_startsAt].getComponentType() == varArgCheckType;
|
||||
// }
|
||||
//
|
||||
// boolean ofSameType = true;
|
||||
// Class<?> typeCheck = messageTypes[0];
|
||||
// for (int i = 1; i < messageTypes.length; i++) {
|
||||
// Class<?> t1 = messageTypes[i];
|
||||
// if (t1 != typeCheck) {
|
||||
// ofSameType = false;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ofSameType && typeCheck == varArgCheckType;
|
||||
//
|
||||
// }
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
// }
|
||||
//
|
||||
//
|
||||
// // fast exit
|
||||
// if (messageTypes.length != length) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// // fast check
|
||||
// if (Arrays.equals(handledMessages, messageTypes)) {
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// // slow check
|
||||
// if (this.acceptsSubtypes) {
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// Class<?> handledMessage = handledMessages[i];
|
||||
// Class<?> messageType = messageTypes[i];
|
||||
//
|
||||
// if (!handledMessage.isAssignableFrom(messageType)) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return true;
|
||||
// } else {
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// Class<?> handledMessage = handledMessages[i];
|
||||
// Class<?> messageType = messageTypes[i];
|
||||
//
|
||||
// if (handledMessage != messageType) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return true;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
/**
|
||||
* @return true if the message types are handled
|
||||
*/
|
||||
public boolean handlesMessage(Class<?>... messageTypes) {
|
||||
Class<?>[] handledMessages = this.handledMessages;
|
||||
int handledLength = handledMessages.length;
|
||||
int handledLengthMinusVarArg = handledLength-1;
|
||||
|
||||
int messagesLength = messageTypes.length;
|
||||
|
||||
// do we even have enough to even CHECK the var-arg?
|
||||
if (messagesLength < handledLengthMinusVarArg) {
|
||||
// totally wrong number of args
|
||||
return false;
|
||||
}
|
||||
|
||||
// check BEFORE var-arg in handler (var-arg can ONLY be last element in array)
|
||||
if (handledLengthMinusVarArg <= messagesLength) {
|
||||
if (this.acceptsSubtypes) {
|
||||
for (int i = 0; i < handledLengthMinusVarArg; i++) {
|
||||
Class<?> handledMessage = handledMessages[i];
|
||||
Class<?> messageType = messageTypes[i];
|
||||
|
||||
if (!handledMessage.isAssignableFrom(messageType)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < handledLengthMinusVarArg; i++) {
|
||||
Class<?> handledMessage = handledMessages[i];
|
||||
Class<?> messageType = messageTypes[i];
|
||||
|
||||
if (handledMessage != messageType) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// do we even HAVE var-arg?
|
||||
if (!handledMessages[handledLengthMinusVarArg].isArray()) {
|
||||
// DO NOT HAVE VAR_ARG PRESENT IN HANDLERS
|
||||
|
||||
// fast exit
|
||||
if (handledLength != messagesLength) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// compare remaining arg
|
||||
Class<?> handledMessage = handledMessages[handledLengthMinusVarArg];
|
||||
Class<?> messageType = messageTypes[handledLengthMinusVarArg];
|
||||
|
||||
if (this.acceptsSubtypes) {
|
||||
if (!handledMessage.isAssignableFrom(messageType)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (handledMessage != messageType) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// all args are dandy
|
||||
return true;
|
||||
}
|
||||
|
||||
if (handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) {
|
||||
// WE HAVE VAR_ARG PRESENT IN HANDLER
|
||||
|
||||
// do we have enough args to NEED to check the var-arg?
|
||||
if (handledLengthMinusVarArg == messagesLength) {
|
||||
// var-arg doesn't need checking
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// then check var-arg in handler
|
||||
|
||||
// all the args to check for the var-arg MUST be the same! (ONLY ONE ARRAY THOUGH CAN BE PRESENT)
|
||||
int messagesLengthMinusVarArg = messagesLength-1;
|
||||
|
||||
Class<?> typeCheck = messageTypes[handledLengthMinusVarArg];
|
||||
for (int i = handledLengthMinusVarArg; i < messagesLength; i++) {
|
||||
Class<?> t1 = messageTypes[i];
|
||||
if (t1 != typeCheck) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// if we got this far, then the args are the same type. IF we have more than one, AND they are arrays, NOPE!
|
||||
if (messagesLength - handledLengthMinusVarArg > 1 && messageTypes[messagesLengthMinusVarArg].isArray()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// are we comparing array -> array or string -> array
|
||||
Class<?> componentType;
|
||||
if (messageTypes[messagesLengthMinusVarArg].isArray()) {
|
||||
componentType = handledMessages[handledLengthMinusVarArg];
|
||||
} else {
|
||||
componentType = handledMessages[handledLengthMinusVarArg].getComponentType();
|
||||
}
|
||||
|
||||
if (this.acceptsSubtypes) {
|
||||
return componentType.isAssignableFrom(typeCheck);
|
||||
} else {
|
||||
return typeCheck == componentType;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean acceptsSubtypes() {
|
||||
return this.acceptsSubtypes;
|
||||
|
|
|
@ -2,7 +2,6 @@ package net.engio.mbassy.listener;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -46,20 +45,6 @@ public class MessageListener {
|
|||
return this.handlers;
|
||||
}
|
||||
|
||||
public List<MessageHandler> getHandlers(Class<?> messageType) {
|
||||
List<MessageHandler> matching = new LinkedList<MessageHandler>();
|
||||
for (MessageHandler handler : this.handlers) {
|
||||
if (handler.handlesMessage(messageType)) {
|
||||
matching.add(handler);
|
||||
}
|
||||
}
|
||||
return matching;
|
||||
}
|
||||
|
||||
public boolean handles(Class<?> messageType) {
|
||||
return !getHandlers(messageType).isEmpty();
|
||||
}
|
||||
|
||||
public Class<?> getListerDefinition() {
|
||||
return this.listenerDefinition;
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class MetadataReader {
|
|||
// but an overriding method does inherit the listener configuration of the overwritten method
|
||||
for (Method handler : bottomMostHandlers) {
|
||||
Handler handlerConfig = ReflectionUtils.getAnnotation( handler, Handler.class);
|
||||
if (!handlerConfig.enabled() || !isValidMessageHandler(handler)) {
|
||||
if (handlerConfig == null || !handlerConfig.enabled()) {
|
||||
continue; // disabled or invalid listeners are ignored
|
||||
}
|
||||
|
||||
|
@ -52,22 +52,4 @@ public class MetadataReader {
|
|||
}
|
||||
return listenerMetadata;
|
||||
}
|
||||
|
||||
|
||||
|
||||
//TODO: change this to support MORE THAN ONE object in the signature
|
||||
private boolean isValidMessageHandler(Method handler) {
|
||||
if (handler == null || ReflectionUtils.getAnnotation( handler, Handler.class) == null) {
|
||||
return false;
|
||||
}
|
||||
if (handler.getParameterTypes().length != 1) {
|
||||
// a messageHandler only defines one parameter (the message)
|
||||
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
|
||||
+ "]. A messageHandler must define exactly one parameter");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,15 +35,15 @@ public class SubscriptionManager {
|
|||
// all subscriptions per message type
|
||||
// this is the primary list for dispatching a specific message
|
||||
// write access is synchronized and happens only when a listener of a specific class is registered the first time
|
||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessage
|
||||
= new HashMap<Class<?>, Collection<Subscription>>(50);
|
||||
// private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessage = new HashMap<Class<?>, Collection<Subscription>>(50);
|
||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessage = new HashMap<Class<?>, Collection<Subscription>>(50);
|
||||
|
||||
|
||||
// all subscriptions per messageHandler type
|
||||
// this map provides fast access for subscribing and unsubscribing
|
||||
// write access is synchronized and happens very infrequently
|
||||
// once a collection of subscriptions is stored it does not change
|
||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener
|
||||
= new HashMap<Class<?>, Collection<Subscription>>(50);
|
||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener = new HashMap<Class<?>, Collection<Subscription>>(50);
|
||||
|
||||
// remember already processed classes that do not contain any message handlers
|
||||
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
|
||||
|
@ -88,23 +88,26 @@ public class SubscriptionManager {
|
|||
Class<? extends Object> listenerClass = listener.getClass();
|
||||
|
||||
if (this.nonListeners.contains(listenerClass)) {
|
||||
return; // early reject of known classes that do not define message handlers
|
||||
// early reject of known classes that do not define message handlers
|
||||
return;
|
||||
}
|
||||
|
||||
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
|
||||
// a listener is either subscribed for the first time
|
||||
if (subscriptionsByListener == null) {
|
||||
List<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
|
||||
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
|
||||
if (messageHandlers.isEmpty()) {
|
||||
// remember the class as non listening class if no handlers are found
|
||||
this.nonListeners.put(listenerClass, this.nonListeners);
|
||||
return;
|
||||
}
|
||||
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
|
||||
|
||||
// it's safe to use non-concurrent collection here (read only)
|
||||
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size());
|
||||
|
||||
// create subscriptions for all detected message handlers
|
||||
for (MessageHandler messageHandler : messageHandlers) {
|
||||
// create the subscription
|
||||
|
||||
try {
|
||||
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
|
||||
|
||||
|
@ -122,13 +125,13 @@ public class SubscriptionManager {
|
|||
// this will acquire a write lock and handle the case when another thread already subscribed
|
||||
// this particular listener in the mean-time
|
||||
subscribe(listener, subscriptionsByListener);
|
||||
} // or the subscriptions already exist and must only be updated
|
||||
}
|
||||
else {
|
||||
// or the subscriptions already exist and must only be updated
|
||||
for (Subscription sub : subscriptionsByListener) {
|
||||
sub.subscribe(listener);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -164,10 +167,14 @@ public class SubscriptionManager {
|
|||
} finally {
|
||||
this.readWriteLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// TODO: convert this to have N number of message types
|
||||
|
||||
// obtain the set of subscriptions for the given message type
|
||||
// Note: never returns null!
|
||||
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
|
||||
|
|
|
@ -18,7 +18,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
@Test
|
||||
public void testSingleThreadedSyncFIFO(){
|
||||
// create a fifo bus with 1000 concurrently subscribed listeners
|
||||
IMessageBus<Integer> fifoBUs = new MBassador<Integer>();
|
||||
IMessageBus fifoBUs = new MBassador();
|
||||
|
||||
List<Listener> listeners = new LinkedList<Listener>();
|
||||
for(int i = 0; i < 1000 ; i++){
|
||||
|
@ -53,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
|
|||
@Test
|
||||
public void testSingleThreadedSyncAsyncFIFO(){
|
||||
// create a fifo bus with 1000 concurrently subscribed listeners
|
||||
IMessageBus<Integer> fifoBUs = new MBassador<Integer>(1);
|
||||
IMessageBus fifoBUs = new MBassador(1);
|
||||
|
||||
List<Listener> listeners = new LinkedList<Listener>();
|
||||
for(int i = 0; i < 1000 ; i++){
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package net.engio.mbassy;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import net.engio.mbassy.annotations.Handler;
|
||||
import net.engio.mbassy.common.AssertSupport;
|
||||
import net.engio.mbassy.listener.MessageHandler;
|
||||
import net.engio.mbassy.listener.MessageListener;
|
||||
import net.engio.mbassy.listener.MetadataReader;
|
||||
|
||||
|
@ -20,72 +23,131 @@ public class MetadataReaderTest extends AssertSupport {
|
|||
|
||||
private MetadataReader reader = new MetadataReader();
|
||||
|
||||
@Test
|
||||
public void testListenerWithoutInheritance() {
|
||||
MessageListener listener = this.reader.getMessageListener(MessageListener1.class);
|
||||
ListenerValidator validator = new ListenerValidator()
|
||||
.expectHandlers(2, String.class)
|
||||
.expectHandlers(2, Object.class)
|
||||
.expectHandlers(1, BufferedReader.class);
|
||||
validator.check(listener);
|
||||
// @Test
|
||||
// public void testListenerWithoutInheritance() {
|
||||
// MessageListener listener = this.reader.getMessageListener(MessageListener1.class);
|
||||
// ListenerValidator validator = new ListenerValidator()
|
||||
// .expectHandlers(2, String.class)
|
||||
// .expectHandlers(2, Object.class)
|
||||
// .expectHandlers(1, BufferedReader.class);
|
||||
// validator.check(listener);
|
||||
// }
|
||||
//
|
||||
// /*
|
||||
// public void testInterfaced() {
|
||||
// MessageListener listener = reader.getMessageListener(InterfacedListener.class);
|
||||
// ListenerValidator validator = new ListenerValidator()
|
||||
// .expectHandlers(1, Object.class);
|
||||
// validator.check(listener);
|
||||
// } WIP */
|
||||
//
|
||||
//
|
||||
// @Test
|
||||
// public void testListenerWithInheritance() {
|
||||
// MessageListener listener = this.reader.getMessageListener(MessageListener2.class);
|
||||
// ListenerValidator validator = new ListenerValidator()
|
||||
// .expectHandlers(2, String.class)
|
||||
// .expectHandlers(2, Object.class)
|
||||
// .expectHandlers(1, BufferedReader.class);
|
||||
// validator.check(listener);
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testListenerWithInheritanceOverriding() {
|
||||
// MessageListener listener = this.reader.getMessageListener(MessageListener3.class);
|
||||
//
|
||||
// ListenerValidator validator = new ListenerValidator()
|
||||
// .expectHandlers(0, String.class)
|
||||
// .expectHandlers(2, Object.class)
|
||||
// .expectHandlers(0, BufferedReader.class);
|
||||
// validator.check(listener);
|
||||
// }
|
||||
|
||||
public static class NClasses {
|
||||
final Class<?>[] messageTypes;
|
||||
|
||||
public NClasses(Class<?> nClass) {
|
||||
this.messageTypes = new Class<?>[] {nClass};
|
||||
}
|
||||
|
||||
/*
|
||||
public void testInterfaced() {
|
||||
MessageListener listener = reader.getMessageListener(InterfacedListener.class);
|
||||
ListenerValidator validator = new ListenerValidator()
|
||||
.expectHandlers(1, Object.class);
|
||||
validator.check(listener);
|
||||
} WIP */
|
||||
|
||||
|
||||
@Test
|
||||
public void testListenerWithInheritance() {
|
||||
MessageListener listener = this.reader.getMessageListener(MessageListener2.class);
|
||||
ListenerValidator validator = new ListenerValidator()
|
||||
.expectHandlers(2, String.class)
|
||||
.expectHandlers(2, Object.class)
|
||||
.expectHandlers(1, BufferedReader.class);
|
||||
validator.check(listener);
|
||||
public NClasses(Class<?>... messageTypes) {
|
||||
this.messageTypes = messageTypes;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListenerWithInheritanceOverriding() {
|
||||
MessageListener listener = this.reader.getMessageListener(MessageListener3.class);
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + Arrays.hashCode(this.messageTypes);
|
||||
return result;
|
||||
}
|
||||
|
||||
ListenerValidator validator = new ListenerValidator()
|
||||
.expectHandlers(0, String.class)
|
||||
.expectHandlers(2, Object.class)
|
||||
.expectHandlers(0, BufferedReader.class);
|
||||
validator.check(listener);
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
NClasses other = (NClasses) obj;
|
||||
if (!Arrays.equals(this.messageTypes, other.messageTypes)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private class ListenerValidator {
|
||||
private Map<NClasses, Integer> handlers = new HashMap<NClasses, Integer>();
|
||||
|
||||
private Map<Class<?>, Integer> handlers = new HashMap<Class<?>, Integer>();
|
||||
public ListenerValidator expectHandlers(Integer count, Class<?> requiredMessageType) {
|
||||
NClasses nClasses = new NClasses(requiredMessageType);
|
||||
|
||||
public ListenerValidator expectHandlers(Integer count, Class<?> messageType){
|
||||
this.handlers.put(messageType, count);
|
||||
this.handlers.put(nClasses, count);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ListenerValidator expectHandlers(Integer count, Class<?>... messageTypes) {
|
||||
NClasses nClasses = new NClasses(messageTypes);
|
||||
|
||||
this.handlers.put(nClasses, count);
|
||||
return this;
|
||||
}
|
||||
|
||||
public void check(MessageListener listener){
|
||||
for(Map.Entry<Class<?>, Integer> expectedHandler: this.handlers.entrySet()){
|
||||
for (Map.Entry<NClasses, Integer> expectedHandler: this.handlers.entrySet()) {
|
||||
NClasses key = expectedHandler.getKey();
|
||||
List<MessageHandler> handlers2 = getHandlers(listener, key.messageTypes);
|
||||
|
||||
if (expectedHandler.getValue() > 0){
|
||||
assertTrue(listener.handles(expectedHandler.getKey()));
|
||||
assertTrue(!handlers2.isEmpty());
|
||||
}
|
||||
else{
|
||||
assertFalse(listener.handles(expectedHandler.getKey()));
|
||||
}
|
||||
assertEquals(expectedHandler.getValue(), listener.getHandlers(expectedHandler.getKey()).size());
|
||||
assertFalse(!handlers2.isEmpty());
|
||||
}
|
||||
assertEquals(expectedHandler.getValue(), handlers2.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// for testing
|
||||
public List<MessageHandler> getHandlers(MessageListener listener, Class<?>... messageTypes) {
|
||||
List<MessageHandler> matching = new LinkedList<MessageHandler>();
|
||||
for (MessageHandler handler : listener.getHandlers()) {
|
||||
if (handler.handlesMessage(messageTypes)) {
|
||||
matching.add(handler);
|
||||
}
|
||||
}
|
||||
return matching;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// a simple event listener
|
||||
@SuppressWarnings("unused")
|
||||
public class MessageListener1 {
|
||||
|
||||
@Handler(rejectSubtypes = true)
|
||||
|
@ -103,7 +165,6 @@ public class MetadataReaderTest extends AssertSupport {
|
|||
public void handleString(String s) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// the same handlers as its super class
|
||||
|
@ -114,7 +175,6 @@ public class MetadataReaderTest extends AssertSupport {
|
|||
public void handleString(String s) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class MessageListener3 extends MessageListener2 {
|
||||
|
@ -131,6 +191,57 @@ public class MetadataReaderTest extends AssertSupport {
|
|||
public void handleString(String s) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testMultipleSignatureListenerWithoutInheritance() {
|
||||
MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class);
|
||||
ListenerValidator validator = new ListenerValidator()
|
||||
.expectHandlers(7, String.class)
|
||||
.expectHandlers(9, String.class, String.class)
|
||||
.expectHandlers(9, String.class, String.class, String.class)
|
||||
.expectHandlers(3, String.class, String[].class)
|
||||
.expectHandlers(1, String.class, String[].class, String[].class)
|
||||
.expectHandlers(6, String[].class)
|
||||
.expectHandlers(3, String[].class, String[].class)
|
||||
.expectHandlers(2, Object.class)
|
||||
.expectHandlers(2, String.class, Object.class)
|
||||
.expectHandlers(2, String.class, Object[].class)
|
||||
;
|
||||
validator.check(listener);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public class MultiMessageListener1 {
|
||||
|
||||
@Handler public void handleString1(String s) {}
|
||||
@Handler public void handleString2(String s, String s1) {}
|
||||
@Handler public void handleString3(String s, String s1, String s2) {}
|
||||
|
||||
@Handler public void handleStringN(String... s1) {}
|
||||
@Handler public void handleStringArray(String[] s1) {}
|
||||
|
||||
@Handler public void handleStringN(Object... s1) {}
|
||||
@Handler public void handleStringArray(Object[] s1) {}
|
||||
|
||||
@Handler public void handleString1plusN(String s, String... s1) {}
|
||||
@Handler public void handleString1plusN(String s, Object... s1) {}
|
||||
|
||||
@Handler public void handleString2plusN(String s, String s1, String... s2) {}
|
||||
@Handler public void handleString2plusN(String s, Object s1, String... s2) {}
|
||||
|
||||
@Handler public void handleStringXplus1(String[] s, String s1) {}
|
||||
|
||||
@Handler public void handleStringXplusN(String[] s, String... s1) {}
|
||||
@Handler public void handleStringXplusN(String[] s, Object... s1) {}
|
||||
|
||||
@Handler public void handleStringXplus1plusN(String[] s, String s1, String... s2) {}
|
||||
@Handler public void handleStringXplus1plusN(String[] s, String s1, Object... o) {}
|
||||
|
||||
@Handler public void handleStringXplus1plusN(String[] s, Object o, Object... o1) {}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user