commit
eb555ca1af
29
README.md
29
README.md
@ -11,7 +11,7 @@ Read this documentation to get an overview of MBassadors features. There is also
|
|||||||
not enough to make a developer happy (work is in progress). But usage of publish subscribe pattern at its core is pretty straight forward and the basic
|
not enough to make a developer happy (work is in progress). But usage of publish subscribe pattern at its core is pretty straight forward and the basic
|
||||||
use cases are very easy to understand and implement.
|
use cases are very easy to understand and implement.
|
||||||
|
|
||||||
The current version is 1.1.8 and it is available from the Maven Central Repository. See the release notes for more details.
|
The current version is 1.1.9 and it is available from the Maven Central Repository. See the release notes for more details.
|
||||||
|
|
||||||
There is also an extension available to support CDI-like transactional message sending in a Spring environment. It's beta but
|
There is also an extension available to support CDI-like transactional message sending in a Spring environment. It's beta but
|
||||||
stable enough to give it a try. See <a href="https://github.com/bennidi/mbassador-spring" target="_blank">here</a>.
|
stable enough to give it a try. See <a href="https://github.com/bennidi/mbassador-spring" target="_blank">here</a>.
|
||||||
@ -41,7 +41,7 @@ asynchronously. This is configurable for each handler via annotations. Message p
|
|||||||
blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
|
blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
|
||||||
+ <em><strong>Weak references</em></strong>: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
|
+ <em><strong>Weak references</em></strong>: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
|
||||||
listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable
|
listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable
|
||||||
in certain environments where listeners are managed by frameworks, i.e. spring, guice etc. Just stuff everything into the message bus, it will
|
in certain environments where listeners are managed by frameworks, i.e. Spring, Guice etc. Just stuff everything into the message bus, it will
|
||||||
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
|
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
|
||||||
+ <em><strong>Strong references</em></strong>: Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener
|
+ <em><strong>Strong references</em></strong>: Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener
|
||||||
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
|
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
|
||||||
@ -143,7 +143,7 @@ Beginning with version 1.1.0 MBassador is available from the Maven Central Repos
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>net.engio</groupId>
|
<groupId>net.engio</groupId>
|
||||||
<artifactId>mbassador</artifactId>
|
<artifactId>mbassador</artifactId>
|
||||||
<version>1.1.7</version>
|
<version>1.1.9</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -151,25 +151,29 @@ Beginning with version 1.1.0 MBassador is available from the Maven Central Repos
|
|||||||
Of course you can always clone the repository and build from source.
|
Of course you can always clone the repository and build from source.
|
||||||
|
|
||||||
<h2>Wiki</h2>
|
<h2>Wiki</h2>
|
||||||
There is ongoing afford to extend documentation and provide code samples and detailed explanations of how the message bus
|
There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus
|
||||||
works. Code samples can also be found in the various test cases. Please read about the terminology used in this project
|
works. Code samples can also be found in the various test cases. Please read about the terminology used in this project
|
||||||
to avoid confusion and misunderstanding.
|
to avoid confusion and misunderstanding.
|
||||||
|
|
||||||
<h2>Release Notes</h2>
|
<h2>Release Notes</h2>
|
||||||
|
|
||||||
|
<h3>1.1.9</h3>
|
||||||
|
|
||||||
|
+ Fixed memory leak reported in issue #53
|
||||||
|
|
||||||
<h3>1.1.8</h3>
|
<h3>1.1.8</h3>
|
||||||
|
|
||||||
+ Internal refactorings and code improvements
|
+ Internal refactorings and code improvements
|
||||||
+ Fixed #44 #45 #47
|
+ Fixed #44 #45 #47
|
||||||
+ NOTE: This release has a known issue with weak references which introduces a memory leak. A fix is on its way for 1.1.9
|
+ NOTE: This release has a known issue with weak references which introduces a memory leak and is fixed in 1.1.9. The
|
||||||
to be released soon
|
version 1.1.8 is not available from the central repository
|
||||||
|
|
||||||
|
|
||||||
<h3>1.1.7</h3>
|
<h3>1.1.7</h3>
|
||||||
|
|
||||||
+ Console Logger not added to message bus instances by default -> use addErrorHandler(IPublicationErrorHandler.ConsoleLogger)
|
+ Console Logger not added to message bus instances by default -> use addErrorHandler(IPublicationErrorHandler.ConsoleLogger)
|
||||||
+ Fixed race conditions in subscription and of WeakConcurrentSet.contains()
|
+ Fixed race conditions in net.engio.mbassy.subscription.Subscription and of WeakConcurrentSet.contains()
|
||||||
+ Improved message hierarchy handling: Now interfaces, enums , (abstract) classes all work in all combinations
|
+ Improved message hierarchy handling: Now interfaces, enums , (abstract) classes should work in all combinations
|
||||||
+ Prevented dispatcher threads from dying on exceptions
|
+ Prevented dispatcher threads from dying on exceptions
|
||||||
+ Improved test-infrastructure and increased test-coverage
|
+ Improved test-infrastructure and increased test-coverage
|
||||||
+ Thanks for your feedback!
|
+ Thanks for your feedback!
|
||||||
@ -232,13 +236,14 @@ First stable release!
|
|||||||
|
|
||||||
|
|
||||||
<h2>Roadmap</h2>
|
<h2>Roadmap</h2>
|
||||||
Check the issues marked with label enhancement. Comment if you would like to see the feature in a future release.
|
Check the issues labeled 'enhancement'. Comment if you would like to see the feature in a future release and/or want to share
|
||||||
|
your ideas on the feature (or a variation thereof).
|
||||||
Please understand that I have limited time to include new features and that I will focus on stability and cleaner APIs.
|
Please understand that I have limited time to include new features and that I will focus on stability and cleaner APIs.
|
||||||
Adding features only works well with well designed and thoroughly tested components especially with all this multi-threaded code
|
Adding features only works with well designed and thoroughly tested components. This is especially true for multi-threaded code
|
||||||
and I am still not 100 percent happy with the existing test coverage.
|
and I am still not 100 percent happy with the existing test design and some parts of the internal code layout.
|
||||||
|
|
||||||
Planned for release:Spring integration with support for conditional message dispatch in transactional context (dispatch only after
|
Planned for release:Spring integration with support for conditional message dispatch in transactional context (dispatch only after
|
||||||
successful commit etc.). Currently in beta, see <a href="https://github.com/bennidi/mbassador-spring">this</a> repository
|
successful transaction commit etc.). Currently in beta, see <a href="https://github.com/bennidi/mbassador-spring">this</a> repository
|
||||||
|
|
||||||
|
|
||||||
<h2>Credits</h2>
|
<h2>Credits</h2>
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
MBassador allows a variety of message handler configurations that will affect how a message
|
MBassador understands a variety of message handler configurations that will affect how a message
|
||||||
is delivered to a specific listener. There are properties to control the handling of subclasses
|
is delivered to a specific listener. There are properties to control the handling of subclasses
|
||||||
of the specified message (the method parameter), the execution order of handlers for the same message type,
|
of the specified message (the method parameter), the execution order of handlers for the same message type,
|
||||||
filters, delivery modes etc.
|
filters, delivery modes etc.
|
||||||
|
@ -12,7 +12,7 @@ public class Filters {
|
|||||||
public static final class AllowAll implements IMessageFilter {
|
public static final class AllowAll implements IMessageFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
|
public boolean accepts(Object event, MessageHandler metadata) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -20,7 +20,7 @@ public class Filters {
|
|||||||
public static final class RejectAll implements IMessageFilter {
|
public static final class RejectAll implements IMessageFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
|
public boolean accepts(Object event, MessageHandler metadata) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -29,7 +29,7 @@ public class Filters {
|
|||||||
public static final class RejectSubtypes implements IMessageFilter {
|
public static final class RejectSubtypes implements IMessageFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
|
public boolean accepts(Object event, MessageHandler metadata) {
|
||||||
for (Class handledMessage : metadata.getHandledMessages()) {
|
for (Class handledMessage : metadata.getHandledMessages()) {
|
||||||
if (handledMessage.equals(event.getClass())) {
|
if (handledMessage.equals(event.getClass())) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -17,5 +17,5 @@ public interface IMessageFilter {
|
|||||||
* @param message the message to be delivered
|
* @param message the message to be delivered
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
boolean accepts(Object message, MessageHandlerMetadata metadata);
|
boolean accepts(Object message, MessageHandler metadata);
|
||||||
}
|
}
|
||||||
|
@ -7,10 +7,14 @@ import java.util.LinkedList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
|
||||||
|
* the handler defines the message listener and more generally, any class containing a message handler in its class hierarchy
|
||||||
|
* defines a message listener.
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 11/14/12
|
* Date: 11/14/12
|
||||||
*/
|
*/
|
||||||
public class MessageHandlerMetadata {
|
public class MessageHandler {
|
||||||
|
|
||||||
private final Method handler;
|
private final Method handler;
|
||||||
|
|
||||||
@ -26,14 +30,14 @@ public class MessageHandlerMetadata {
|
|||||||
|
|
||||||
private final boolean acceptsSubtypes;
|
private final boolean acceptsSubtypes;
|
||||||
|
|
||||||
private final MessageListenerMetadata listenerConfig;
|
private final MessageListener listenerConfig;
|
||||||
|
|
||||||
private final boolean isSynchronized;
|
private final boolean isSynchronized;
|
||||||
|
|
||||||
private Class listeningClass;
|
private Class listeningClass;
|
||||||
|
|
||||||
|
|
||||||
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig, MessageListenerMetadata listenerConfig) {
|
public MessageHandler(Method handler, IMessageFilter[] filter, Handler handlerConfig, MessageListener listenerConfig) {
|
||||||
if(handler == null || handlerConfig == null){
|
if(handler == null || handlerConfig == null){
|
||||||
throw new IllegalArgumentException("The message handler configuration may not be null");
|
throw new IllegalArgumentException("The message handler configuration may not be null");
|
||||||
}
|
}
|
87
src/main/java/net/engio/mbassy/listener/MessageListener.java
Normal file
87
src/main/java/net/engio/mbassy/listener/MessageListener.java
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package net.engio.mbassy.listener;
|
||||||
|
|
||||||
|
import net.engio.mbassy.common.IPredicate;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus,
|
||||||
|
* a message listener is any object capable of receiving messages by means of defined message handlers.
|
||||||
|
* There are no restrictions about the number of allowed message handlers in a message listener.
|
||||||
|
*
|
||||||
|
* A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler
|
||||||
|
* definition it contains.
|
||||||
|
*
|
||||||
|
* This class is an internal representation of a message listener used to encapsulate all relevant objects
|
||||||
|
* and data about that message listener, especially all its handlers.
|
||||||
|
* There will be only one instance of MessageListener per message listener class and message bus instance.
|
||||||
|
*
|
||||||
|
* @author bennidi
|
||||||
|
* Date: 12/16/12
|
||||||
|
*/
|
||||||
|
public class MessageListener<T> {
|
||||||
|
|
||||||
|
|
||||||
|
public static IPredicate<MessageHandler> ForMessage(final Class<?> messageType) {
|
||||||
|
return new IPredicate<MessageHandler>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(MessageHandler target) {
|
||||||
|
return target.handlesMessage(messageType);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
|
||||||
|
|
||||||
|
private Class<T> listenerDefinition;
|
||||||
|
|
||||||
|
private Listener listenerAnnotation;
|
||||||
|
|
||||||
|
public MessageListener(Class<T> listenerDefinition) {
|
||||||
|
this.listenerDefinition = listenerDefinition;
|
||||||
|
listenerAnnotation = listenerDefinition.getAnnotation(Listener.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean isFromListener(Class listener){
|
||||||
|
return listenerDefinition.equals(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean useStrongReferences(){
|
||||||
|
return listenerAnnotation != null && listenerAnnotation.references().equals(References.Strong);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageListener addHandlers(Collection<? extends MessageHandler> c) {
|
||||||
|
handlers.addAll(c);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean addHandler(MessageHandler messageHandler) {
|
||||||
|
return handlers.add(messageHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MessageHandler> getHandlers(){
|
||||||
|
return handlers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MessageHandler> getHandlers(IPredicate<MessageHandler> filter) {
|
||||||
|
List<MessageHandler> matching = new LinkedList<MessageHandler>();
|
||||||
|
for (MessageHandler handler : handlers) {
|
||||||
|
if (filter.apply(handler)) {
|
||||||
|
matching.add(handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return matching;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean handles(Class<?> messageType) {
|
||||||
|
return !getHandlers(ForMessage(messageType)).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<T> getListerDefinition() {
|
||||||
|
return listenerDefinition;
|
||||||
|
}
|
||||||
|
}
|
@ -1,79 +0,0 @@
|
|||||||
package net.engio.mbassy.listener;
|
|
||||||
|
|
||||||
import net.engio.mbassy.common.IPredicate;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Provides information about the message listeners of a specific class. Each message handler
|
|
||||||
* defined by the target class is represented as a single entity.
|
|
||||||
*
|
|
||||||
* @author bennidi
|
|
||||||
* Date: 12/16/12
|
|
||||||
*/
|
|
||||||
public class MessageListenerMetadata<T> {
|
|
||||||
|
|
||||||
|
|
||||||
public static IPredicate<MessageHandlerMetadata> ForMessage(final Class<?> messageType) {
|
|
||||||
return new IPredicate<MessageHandlerMetadata>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(MessageHandlerMetadata target) {
|
|
||||||
return target.handlesMessage(messageType);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<MessageHandlerMetadata> handlers = new ArrayList<MessageHandlerMetadata>();
|
|
||||||
|
|
||||||
private Class<T> listenerDefinition;
|
|
||||||
|
|
||||||
private Listener listenerAnnotation;
|
|
||||||
|
|
||||||
public MessageListenerMetadata(Class<T> listenerDefinition) {
|
|
||||||
this.listenerDefinition = listenerDefinition;
|
|
||||||
listenerAnnotation = listenerDefinition.getAnnotation(Listener.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public boolean isFromListener(Class listener){
|
|
||||||
return listenerDefinition.equals(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean useStrongReferences(){
|
|
||||||
return listenerAnnotation != null && listenerAnnotation.references().equals(References.Strong);
|
|
||||||
}
|
|
||||||
|
|
||||||
public MessageListenerMetadata addHandlers(Collection<? extends MessageHandlerMetadata> c) {
|
|
||||||
handlers.addAll(c);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean addHandler(MessageHandlerMetadata messageHandlerMetadata) {
|
|
||||||
return handlers.add(messageHandlerMetadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<MessageHandlerMetadata> getHandlers(){
|
|
||||||
return handlers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter) {
|
|
||||||
List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>();
|
|
||||||
for (MessageHandlerMetadata handler : handlers) {
|
|
||||||
if (filter.apply(handler)) {
|
|
||||||
matching.add(handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return matching;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean handles(Class<?> messageType) {
|
|
||||||
return !getHandlers(ForMessage(messageType)).isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Class<T> getListerDefinition() {
|
|
||||||
return listenerDefinition;
|
|
||||||
}
|
|
||||||
}
|
|
@ -55,8 +55,8 @@ public class MetadataReader {
|
|||||||
|
|
||||||
// get all listeners defined by the given class (includes
|
// get all listeners defined by the given class (includes
|
||||||
// listeners defined in super classes)
|
// listeners defined in super classes)
|
||||||
public MessageListenerMetadata getMessageListener(Class target) {
|
public MessageListener getMessageListener(Class target) {
|
||||||
MessageListenerMetadata listenerMetadata = new MessageListenerMetadata(target);
|
MessageListener listenerMetadata = new MessageListener(target);
|
||||||
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
|
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
|
||||||
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
||||||
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
|
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
|
||||||
@ -76,7 +76,7 @@ public class MetadataReader {
|
|||||||
}
|
}
|
||||||
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
|
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
|
||||||
// if a handler is overwritten it inherits the configuration of its parent method
|
// if a handler is overwritten it inherits the configuration of its parent method
|
||||||
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
|
MessageHandler handlerMetadata = new MessageHandler(overriddenHandler == null ? handler : overriddenHandler,
|
||||||
getFilter(handlerConfig), handlerConfig, listenerMetadata);
|
getFilter(handlerConfig), handlerConfig, listenerMetadata);
|
||||||
listenerMetadata.addHandler(handlerMetadata);
|
listenerMetadata.addHandler(handlerMetadata);
|
||||||
|
|
||||||
|
@ -9,7 +9,16 @@ import java.util.List;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A subscription is a thread safe container for objects that contain message handlers
|
* A subscription is a thread-safe container that manages exactly one message handler of all registered
|
||||||
|
* message listeners of the same class, i.e. all subscribed instances of a SingleMessageHandler.class
|
||||||
|
* will be referenced in the subscription created for SingleMessageHandler.class.
|
||||||
|
*
|
||||||
|
* There will be as many unique subscription objects per message listener class as there are message handlers
|
||||||
|
* defined in the message listeners class hierarchy.
|
||||||
|
*
|
||||||
|
* The subscription provides functionality for message publication by means of delegation to the respective
|
||||||
|
* message dispatcher.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
public class Subscription {
|
public class Subscription {
|
||||||
|
|
||||||
@ -27,14 +36,30 @@ public class Subscription {
|
|||||||
this.listeners = listeners;
|
this.listeners = listeners;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether this subscription manages a message handler of the given message listener class
|
||||||
|
*
|
||||||
|
* @param listener
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public boolean belongsTo(Class listener){
|
public boolean belongsTo(Class listener){
|
||||||
return context.getHandlerMetadata().isFromListener(listener);
|
return context.getHandlerMetadata().isFromListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether this subscriptions manages the given listener instance
|
||||||
|
* @param listener
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public boolean contains(Object listener){
|
public boolean contains(Object listener){
|
||||||
return listeners.contains(listener);
|
return listeners.contains(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether this subscription manages a message handler
|
||||||
|
* @param messageType
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public boolean handlesMessageType(Class<?> messageType) {
|
public boolean handlesMessageType(Class<?> messageType) {
|
||||||
return context.getHandlerMetadata().handlesMessage(messageType);
|
return context.getHandlerMetadata().handlesMessage(messageType);
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package net.engio.mbassy.subscription;
|
|||||||
import net.engio.mbassy.IPublicationErrorHandler;
|
import net.engio.mbassy.IPublicationErrorHandler;
|
||||||
import net.engio.mbassy.bus.BusRuntime;
|
import net.engio.mbassy.bus.BusRuntime;
|
||||||
import net.engio.mbassy.bus.RuntimeProvider;
|
import net.engio.mbassy.bus.RuntimeProvider;
|
||||||
import net.engio.mbassy.listener.MessageHandlerMetadata;
|
import net.engio.mbassy.listener.MessageHandler;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
@ -19,14 +19,14 @@ import java.util.Collection;
|
|||||||
public class SubscriptionContext implements RuntimeProvider {
|
public class SubscriptionContext implements RuntimeProvider {
|
||||||
|
|
||||||
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
|
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
|
||||||
private final MessageHandlerMetadata handlerMetadata;
|
private final MessageHandler handlerMetadata;
|
||||||
|
|
||||||
// error handling is first-class functionality
|
// error handling is first-class functionality
|
||||||
private final Collection<IPublicationErrorHandler> errorHandlers;
|
private final Collection<IPublicationErrorHandler> errorHandlers;
|
||||||
|
|
||||||
private BusRuntime runtime;
|
private BusRuntime runtime;
|
||||||
|
|
||||||
public SubscriptionContext(BusRuntime runtime, MessageHandlerMetadata handlerMetadata,
|
public SubscriptionContext(BusRuntime runtime, MessageHandler handlerMetadata,
|
||||||
Collection<IPublicationErrorHandler> errorHandlers) {
|
Collection<IPublicationErrorHandler> errorHandlers) {
|
||||||
this.runtime = runtime;
|
this.runtime = runtime;
|
||||||
this.handlerMetadata = handlerMetadata;
|
this.handlerMetadata = handlerMetadata;
|
||||||
@ -39,7 +39,7 @@ public class SubscriptionContext implements RuntimeProvider {
|
|||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public MessageHandlerMetadata getHandlerMetadata() {
|
public MessageHandler getHandlerMetadata() {
|
||||||
return handlerMetadata;
|
return handlerMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ import net.engio.mbassy.bus.BusRuntime;
|
|||||||
import net.engio.mbassy.common.StrongConcurrentSet;
|
import net.engio.mbassy.common.StrongConcurrentSet;
|
||||||
import net.engio.mbassy.common.WeakConcurrentSet;
|
import net.engio.mbassy.common.WeakConcurrentSet;
|
||||||
import net.engio.mbassy.dispatch.*;
|
import net.engio.mbassy.dispatch.*;
|
||||||
import net.engio.mbassy.listener.MessageHandlerMetadata;
|
import net.engio.mbassy.listener.MessageHandler;
|
||||||
|
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.Modifier;
|
import java.lang.reflect.Modifier;
|
||||||
@ -20,7 +20,7 @@ public class SubscriptionFactory {
|
|||||||
|
|
||||||
private static final String ErrorHandlers = "error.handlers";
|
private static final String ErrorHandlers = "error.handlers";
|
||||||
|
|
||||||
public Subscription createSubscription(BusRuntime runtime, MessageHandlerMetadata handlerMetadata) throws MessageBusException{
|
public Subscription createSubscription(BusRuntime runtime, MessageHandler handlerMetadata) throws MessageBusException{
|
||||||
try {
|
try {
|
||||||
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(ErrorHandlers);
|
Collection<IPublicationErrorHandler> errorHandlers = runtime.get(ErrorHandlers);
|
||||||
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
|
SubscriptionContext context = new SubscriptionContext(runtime, handlerMetadata, errorHandlers);
|
||||||
|
@ -3,16 +3,17 @@ package net.engio.mbassy.subscription;
|
|||||||
import net.engio.mbassy.bus.BusRuntime;
|
import net.engio.mbassy.bus.BusRuntime;
|
||||||
import net.engio.mbassy.common.ReflectionUtils;
|
import net.engio.mbassy.common.ReflectionUtils;
|
||||||
import net.engio.mbassy.common.StrongConcurrentSet;
|
import net.engio.mbassy.common.StrongConcurrentSet;
|
||||||
import net.engio.mbassy.listener.MessageHandlerMetadata;
|
import net.engio.mbassy.listener.MessageHandler;
|
||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The subscription managers primary task is consistently handle new and existing subscriptions
|
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
|
||||||
* and to synchronize concurrent access to them efficiently. It takes care of properly registering and
|
* It provides fast lookup of existing subscriptions when another instance of an already known
|
||||||
* unregistering message listeners and is a core component of each bus implementation
|
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
|
||||||
|
* message handlers.
|
||||||
*
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 5/11/13
|
* Date: 5/11/13
|
||||||
@ -42,6 +43,7 @@ public class SubscriptionManager {
|
|||||||
// it can be customized by implementing the getSubscriptionFactory() method
|
// it can be customized by implementing the getSubscriptionFactory() method
|
||||||
private final SubscriptionFactory subscriptionFactory;
|
private final SubscriptionFactory subscriptionFactory;
|
||||||
|
|
||||||
|
// synchronize read/write acces to the subscription maps
|
||||||
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private final BusRuntime runtime;
|
private final BusRuntime runtime;
|
||||||
@ -88,14 +90,14 @@ public class SubscriptionManager {
|
|||||||
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
|
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
|
||||||
// a listener is either subscribed for the first time
|
// a listener is either subscribed for the first time
|
||||||
if (subscriptionsByListener == null) {
|
if (subscriptionsByListener == null) {
|
||||||
List<MessageHandlerMetadata> messageHandlers = metadataReader.getMessageListener(listener.getClass()).getHandlers();
|
List<MessageHandler> messageHandlers = metadataReader.getMessageListener(listener.getClass()).getHandlers();
|
||||||
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
|
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
|
||||||
nonListeners.add(listener.getClass());
|
nonListeners.add(listener.getClass());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
|
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
|
||||||
// create subscriptions for all detected message handlers
|
// create subscriptions for all detected message handlers
|
||||||
for (MessageHandlerMetadata messageHandler : messageHandlers) {
|
for (MessageHandler messageHandler : messageHandlers) {
|
||||||
// create the subscription
|
// create the subscription
|
||||||
subscriptionsByListener.add(subscriptionFactory.createSubscription(runtime, messageHandler));
|
subscriptionsByListener.add(subscriptionFactory.createSubscription(runtime, messageHandler));
|
||||||
}
|
}
|
||||||
@ -157,7 +159,9 @@ public class SubscriptionManager {
|
|||||||
// Note: never returns null!
|
// Note: never returns null!
|
||||||
public Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
|
public Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
|
||||||
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
|
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
|
||||||
|
try{
|
||||||
readWriteLock.readLock().lock();
|
readWriteLock.readLock().lock();
|
||||||
|
|
||||||
if (subscriptionsPerMessage.get(messageType) != null) {
|
if (subscriptionsPerMessage.get(messageType) != null) {
|
||||||
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
|
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
|
||||||
}
|
}
|
||||||
@ -171,12 +175,14 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}finally{
|
||||||
readWriteLock.readLock().unlock();
|
readWriteLock.readLock().unlock();
|
||||||
|
}
|
||||||
return subscriptions;
|
return subscriptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// associate a suscription with a message type
|
// associate a subscription with a message type
|
||||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||||
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
|
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
|
||||||
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
|
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
|
||||||
|
@ -105,7 +105,7 @@ public class FilterTest extends MessageBusTest {
|
|||||||
public static class RejectFilteredObjects implements IMessageFilter{
|
public static class RejectFilteredObjects implements IMessageFilter{
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accepts(Object message, MessageHandlerMetadata metadata) {
|
public boolean accepts(Object message, MessageHandler metadata) {
|
||||||
if(message.getClass().equals(FilteredMessage.class) && ((FilteredMessage)message).getMessage().getClass().equals(Object.class)){
|
if(message.getClass().equals(FilteredMessage.class) && ((FilteredMessage)message).getMessage().getClass().equals(Object.class)){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
package net.engio.mbassy;
|
package net.engio.mbassy;
|
||||||
|
|
||||||
import net.engio.mbassy.common.AssertSupport;
|
import net.engio.mbassy.common.AssertSupport;
|
||||||
|
import net.engio.mbassy.listener.MessageListener;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import net.engio.mbassy.listener.Enveloped;
|
import net.engio.mbassy.listener.Enveloped;
|
||||||
import net.engio.mbassy.listener.Handler;
|
import net.engio.mbassy.listener.Handler;
|
||||||
import net.engio.mbassy.listener.MessageListenerMetadata;
|
|
||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
import net.engio.mbassy.subscription.MessageEnvelope;
|
import net.engio.mbassy.subscription.MessageEnvelope;
|
||||||
|
|
||||||
@ -13,7 +13,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static net.engio.mbassy.listener.MessageListenerMetadata.ForMessage;
|
import static net.engio.mbassy.listener.MessageListener.ForMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -26,7 +26,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListenerWithoutInheritance() {
|
public void testListenerWithoutInheritance() {
|
||||||
MessageListenerMetadata<MessageListener1> listener = reader.getMessageListener(MessageListener1.class);
|
MessageListener<MessageListener1> listener = reader.getMessageListener(MessageListener1.class);
|
||||||
ListenerValidator validator = new ListenerValidator()
|
ListenerValidator validator = new ListenerValidator()
|
||||||
.expectHandlers(2, String.class)
|
.expectHandlers(2, String.class)
|
||||||
.expectHandlers(2, Object.class)
|
.expectHandlers(2, Object.class)
|
||||||
@ -37,7 +37,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListenerWithInheritance() {
|
public void testListenerWithInheritance() {
|
||||||
MessageListenerMetadata<MessageListener2> listener = reader.getMessageListener(MessageListener2.class);
|
MessageListener<MessageListener2> listener = reader.getMessageListener(MessageListener2.class);
|
||||||
ListenerValidator validator = new ListenerValidator()
|
ListenerValidator validator = new ListenerValidator()
|
||||||
.expectHandlers(2, String.class)
|
.expectHandlers(2, String.class)
|
||||||
.expectHandlers(2, Object.class)
|
.expectHandlers(2, Object.class)
|
||||||
@ -47,7 +47,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListenerWithInheritanceOverriding() {
|
public void testListenerWithInheritanceOverriding() {
|
||||||
MessageListenerMetadata<MessageListener3> listener = reader.getMessageListener(MessageListener3.class);
|
MessageListener<MessageListener3> listener = reader.getMessageListener(MessageListener3.class);
|
||||||
|
|
||||||
ListenerValidator validator = new ListenerValidator()
|
ListenerValidator validator = new ListenerValidator()
|
||||||
.expectHandlers(0, String.class)
|
.expectHandlers(0, String.class)
|
||||||
@ -58,7 +58,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEnveloped() {
|
public void testEnveloped() {
|
||||||
MessageListenerMetadata<EnvelopedListener> listener = reader.getMessageListener(EnvelopedListener.class);
|
MessageListener<EnvelopedListener> listener = reader.getMessageListener(EnvelopedListener.class);
|
||||||
ListenerValidator validator = new ListenerValidator()
|
ListenerValidator validator = new ListenerValidator()
|
||||||
.expectHandlers(1, String.class)
|
.expectHandlers(1, String.class)
|
||||||
.expectHandlers(2, Integer.class)
|
.expectHandlers(2, Integer.class)
|
||||||
@ -71,7 +71,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEnvelopedSubclass() {
|
public void testEnvelopedSubclass() {
|
||||||
MessageListenerMetadata<EnvelopedListenerSubclass> listener = reader.getMessageListener(EnvelopedListenerSubclass.class);
|
MessageListener<EnvelopedListenerSubclass> listener = reader.getMessageListener(EnvelopedListenerSubclass.class);
|
||||||
ListenerValidator validator = new ListenerValidator()
|
ListenerValidator validator = new ListenerValidator()
|
||||||
.expectHandlers(1, String.class)
|
.expectHandlers(1, String.class)
|
||||||
.expectHandlers(2, Integer.class)
|
.expectHandlers(2, Integer.class)
|
||||||
@ -91,7 +91,7 @@ public class MetadataReaderTest extends AssertSupport {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void check(MessageListenerMetadata listener){
|
public void check(MessageListener listener){
|
||||||
for(Map.Entry<Class<?>, Integer> expectedHandler: handlers.entrySet()){
|
for(Map.Entry<Class<?>, Integer> expectedHandler: handlers.entrySet()){
|
||||||
if(expectedHandler.getValue() > 0){
|
if(expectedHandler.getValue() > 0){
|
||||||
assertTrue(listener.handles(expectedHandler.getKey()));
|
assertTrue(listener.handles(expectedHandler.getKey()));
|
||||||
|
Loading…
Reference in New Issue
Block a user