Merge of lennartj-master, code style fixes
This commit is contained in:
commit
f52fc0e681
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,5 +1,9 @@
|
|||||||
# idea project settings #
|
# idea project settings #
|
||||||
*.iml
|
*.iml
|
||||||
|
*.ipr
|
||||||
|
*.iws
|
||||||
|
.idea/*
|
||||||
|
.idea
|
||||||
|
|
||||||
# Package Files #
|
# Package Files #
|
||||||
*.war
|
*.war
|
||||||
|
52
pom.xml
52
pom.xml
@ -10,7 +10,7 @@
|
|||||||
<groupId>net.engio</groupId>
|
<groupId>net.engio</groupId>
|
||||||
<artifactId>mbassador</artifactId>
|
<artifactId>mbassador</artifactId>
|
||||||
<version>1.1.4-SNAPSHOT</version>
|
<version>1.1.4-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>bundle</packaging>
|
||||||
<name>mbassador</name>
|
<name>mbassador</name>
|
||||||
<description>
|
<description>
|
||||||
Mbassador is a fast and flexible message bus system following the publish subscribe pattern.
|
Mbassador is a fast and flexible message bus system following the publish subscribe pattern.
|
||||||
@ -23,7 +23,6 @@
|
|||||||
weak-references,
|
weak-references,
|
||||||
message filtering,
|
message filtering,
|
||||||
ordering of message handlers etc.
|
ordering of message handlers etc.
|
||||||
|
|
||||||
</description>
|
</description>
|
||||||
|
|
||||||
<url>https://github.com/bennidi/mbassador</url>
|
<url>https://github.com/bennidi/mbassador</url>
|
||||||
@ -50,6 +49,10 @@
|
|||||||
</developers>
|
</developers>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
<nazgul-codestyle.version>2.0.1</nazgul-codestyle.version>
|
||||||
|
<jdk.version>1.6</jdk.version>
|
||||||
|
<pmd.plugin.version>3.0.1</pmd.plugin.version>
|
||||||
|
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<project.build.java.version>1.6</project.build.java.version>
|
<project.build.java.version>1.6</project.build.java.version>
|
||||||
<github.url>file://${project.basedir}/mvn-local-repo</github.url>
|
<github.url>file://${project.basedir}/mvn-local-repo</github.url>
|
||||||
@ -77,6 +80,51 @@
|
|||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
<!-- plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-pmd-plugin</artifactId>
|
||||||
|
<version>${pmd.plugin.version}</version>
|
||||||
|
<configuration>
|
||||||
|
<excludeRoots>
|
||||||
|
<excludeRoot>src/main/generated</excludeRoot>
|
||||||
|
<excludeRoot>src/test</excludeRoot>
|
||||||
|
</excludeRoots>
|
||||||
|
<rulesets>
|
||||||
|
<ruleset>/codestyle/pmd-rules.xml</ruleset>
|
||||||
|
</rulesets>
|
||||||
|
<targetJdk>${jdk.version}</targetJdk>
|
||||||
|
<sourceEncoding>${project.build.sourceEncoding}</sourceEncoding>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<goals>
|
||||||
|
<goal>check</goal>
|
||||||
|
<goal>cpd-check</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>se.jguru.nazgul.tools.codestyle</groupId>
|
||||||
|
<artifactId>nazgul-codestyle</artifactId>
|
||||||
|
<version>${nazgul-codestyle.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</plugin -->
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.felix</groupId>
|
||||||
|
<artifactId>maven-bundle-plugin</artifactId>
|
||||||
|
<version>2.3.7</version>
|
||||||
|
<extensions>true</extensions>
|
||||||
|
<configuration>
|
||||||
|
<instructions>
|
||||||
|
<Bundle-SymbolicName>${project.groupId}.${project.artifactId}</Bundle-SymbolicName>
|
||||||
|
<Export-Package>{local-packages}</Export-Package>
|
||||||
|
</instructions>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
@ -1,32 +1,44 @@
|
|||||||
package net.engio.mbassy;
|
package net.engio.mbassy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publication error handlers are provided with a publication error every time an error occurs during message publication.
|
* Publication error handlers are provided with a publication error every time an
|
||||||
* A handler might fail with an exception, not be accessible because of the presence of a security manager
|
* error occurs during message publication.
|
||||||
* or other reasons might lead to failures during the message publication process.
|
* A handler might fail with an exception, not be accessible because of the presence
|
||||||
*
|
* of a security manager or other reasons might lead to failures during the message publication process.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 2/22/12
|
* Date: 2/22/12
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("PMD.UnusedModifier")
|
||||||
public interface IPublicationErrorHandler {
|
public interface IPublicationErrorHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle the given publication error.
|
* Handle the given publication error.
|
||||||
*
|
*
|
||||||
* @param error
|
* @param error The PublicationError to handle.
|
||||||
*/
|
*/
|
||||||
public void handleError(PublicationError error);
|
void handleError(PublicationError error);
|
||||||
|
|
||||||
// This is the default error handler it will simply log to standard out and
|
/**
|
||||||
// print stack trace if available
|
* The default error handler will simply log to standard out and
|
||||||
|
* print the stack trace if available.
|
||||||
|
*/
|
||||||
static final class ConsoleLogger implements IPublicationErrorHandler {
|
static final class ConsoleLogger implements IPublicationErrorHandler {
|
||||||
@Override
|
|
||||||
public void handleError(PublicationError error) {
|
|
||||||
System.out.println(error);
|
|
||||||
if (error.getCause() != null) error.getCause().printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
;
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void handleError(final PublicationError error) {
|
||||||
|
|
||||||
|
// Printout the error itself
|
||||||
|
System.out.println(error);
|
||||||
|
|
||||||
|
// Printout the stacktrace from the cause.
|
||||||
|
if (error.getCause() != null) {
|
||||||
|
error.getCause().printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,27 +3,39 @@ package net.engio.mbassy;
|
|||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publication errors are created when object publication fails for some reason and contain details
|
* Publication errors are created when object publication fails
|
||||||
* as to the cause and location where they occured.
|
* for some reason and contain details as to the cause and location
|
||||||
|
* where they occurred.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 2/22/12
|
* Date: 2/22/12
|
||||||
* Time: 4:59 PM
|
* Time: 4:59 PM
|
||||||
*/
|
*/
|
||||||
public class PublicationError {
|
public class PublicationError {
|
||||||
|
|
||||||
|
// Internal state
|
||||||
private Throwable cause;
|
private Throwable cause;
|
||||||
|
|
||||||
private String message;
|
private String message;
|
||||||
|
|
||||||
private Method listener;
|
private Method listener;
|
||||||
|
|
||||||
private Object listeningObject;
|
private Object listeningObject;
|
||||||
|
|
||||||
private Object publishedObject;
|
private Object publishedObject;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compound constructor, creating a PublicationError from the supplied objects.
|
||||||
|
*
|
||||||
|
* @param cause The Throwable giving rise to this PublicationError.
|
||||||
|
* @param message The message to send.
|
||||||
|
* @param listener The method where the error was created.
|
||||||
|
* @param listeningObject The object in which the PublicationError was generated.
|
||||||
|
* @param publishedObject The published object which gave rise to the error.
|
||||||
|
*/
|
||||||
|
public PublicationError(final Throwable cause,
|
||||||
|
final String message,
|
||||||
|
final Method listener,
|
||||||
|
final Object listeningObject,
|
||||||
|
final Object publishedObject) {
|
||||||
|
|
||||||
public PublicationError(Throwable cause, String message, Method listener, Object listeningObject, Object publishedObject) {
|
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
this.message = message;
|
this.message = message;
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
@ -31,14 +43,26 @@ public class PublicationError {
|
|||||||
this.publishedObject = publishedObject;
|
this.publishedObject = publishedObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PublicationError(){
|
/**
|
||||||
|
* Default constructor.
|
||||||
|
*/
|
||||||
|
public PublicationError() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The Throwable giving rise to this PublicationError.
|
||||||
|
*/
|
||||||
public Throwable getCause() {
|
public Throwable getCause() {
|
||||||
return cause;
|
return cause;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assigns the cause of this PublicationError.
|
||||||
|
*
|
||||||
|
* @param cause A Throwable which gave rise to this PublicationError.
|
||||||
|
* @return This PublicationError.
|
||||||
|
*/
|
||||||
public PublicationError setCause(Throwable cause) {
|
public PublicationError setCause(Throwable cause) {
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
return this;
|
return this;
|
||||||
@ -80,6 +104,9 @@ public class PublicationError {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "PublicationError{" +
|
return "PublicationError{" +
|
||||||
|
@ -3,14 +3,28 @@ package net.engio.mbassy.bus;
|
|||||||
import net.engio.mbassy.IPublicationErrorHandler;
|
import net.engio.mbassy.IPublicationErrorHandler;
|
||||||
import net.engio.mbassy.PublicationError;
|
import net.engio.mbassy.PublicationError;
|
||||||
import net.engio.mbassy.common.ReflectionUtils;
|
import net.engio.mbassy.common.ReflectionUtils;
|
||||||
import net.engio.mbassy.subscription.SubscriptionContext;
|
|
||||||
import net.engio.mbassy.listener.MessageHandlerMetadata;
|
import net.engio.mbassy.listener.MessageHandlerMetadata;
|
||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
import net.engio.mbassy.subscription.Subscription;
|
import net.engio.mbassy.subscription.Subscription;
|
||||||
|
import net.engio.mbassy.subscription.SubscriptionContext;
|
||||||
import net.engio.mbassy.subscription.SubscriptionFactory;
|
import net.engio.mbassy.subscription.SubscriptionFactory;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.*;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The base class for all message bus implementations.
|
* The base class for all message bus implementations.
|
||||||
@ -29,15 +43,17 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
// all subscriptions per message type
|
// all subscriptions per message type
|
||||||
// this is the primary list for dispatching a specific message
|
// this is the primary list for dispatching a specific message
|
||||||
// write access is synchronized and happens very infrequently
|
// write access is synchronized and happens very infrequently
|
||||||
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage = new HashMap(50);
|
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage
|
||||||
|
= new HashMap<Class, Collection<Subscription>>(50);
|
||||||
|
|
||||||
// all subscriptions per messageHandler type
|
// all subscriptions per messageHandler type
|
||||||
// this list provides fast access for subscribing and unsubscribing
|
// this list provides fast access for subscribing and unsubscribing
|
||||||
// write access is synchronized and happens very infrequently
|
// write access is synchronized and happens very infrequently
|
||||||
private final Map<Class, Collection<Subscription>> subscriptionsPerListener = new HashMap(50);
|
private final Map<Class, Collection<Subscription>> subscriptionsPerListener
|
||||||
|
= new HashMap<Class, Collection<Subscription>>(50);
|
||||||
|
|
||||||
// remember already processed classes that do not contain any listeners
|
// remember already processed classes that do not contain any listeners
|
||||||
private final Collection<Class> nonListeners = new HashSet();
|
private final Collection<Class> nonListeners = new HashSet<Class>();
|
||||||
|
|
||||||
// this handler will receive all errors that occur during message dispatch or message handling
|
// this handler will receive all errors that occur during message dispatch or message handling
|
||||||
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
|
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
|
||||||
@ -89,7 +105,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MessagePublication.Factory getPublicationFactory(){
|
protected MessagePublication.Factory getPublicationFactory() {
|
||||||
return publicationFactory;
|
return publicationFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,9 +115,13 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean unsubscribe(Object listener) {
|
public boolean unsubscribe(Object listener) {
|
||||||
if (listener == null) return false;
|
if (listener == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
|
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
|
||||||
if (subscriptions == null) return false;
|
if (subscriptions == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
boolean isRemoved = true;
|
boolean isRemoved = true;
|
||||||
for (Subscription subscription : subscriptions) {
|
for (Subscription subscription : subscriptions) {
|
||||||
isRemoved = isRemoved && subscription.unsubscribe(listener);
|
isRemoved = isRemoved && subscription.unsubscribe(listener);
|
||||||
@ -113,8 +133,9 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
public void subscribe(Object listener) {
|
public void subscribe(Object listener) {
|
||||||
try {
|
try {
|
||||||
Class listeningClass = listener.getClass();
|
Class listeningClass = listener.getClass();
|
||||||
if (nonListeners.contains(listeningClass))
|
if (nonListeners.contains(listeningClass)) {
|
||||||
return; // early reject of known classes that do not participate in eventing
|
return; // early reject of known classes that do not participate in eventing
|
||||||
|
}
|
||||||
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
|
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
|
||||||
if (subscriptionsByListener == null) { // if the type is registered for the first time
|
if (subscriptionsByListener == null) { // if the type is registered for the first time
|
||||||
synchronized (this) { // new subscriptions must be processed sequentially
|
synchronized (this) { // new subscriptions must be processed sequentially
|
||||||
@ -135,7 +156,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions)
|
subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions)
|
||||||
|
|
||||||
List<Class<?>> messageTypes = messageHandler.getHandledMessages();
|
List<Class<?>> messageTypes = messageHandler.getHandledMessages();
|
||||||
for(Class<?> messageType : messageTypes){
|
for (Class<?> messageType : messageTypes) {
|
||||||
addMessageTypeSubscription(messageType, subscription);
|
addMessageTypeSubscription(messageType, subscription);
|
||||||
}
|
}
|
||||||
//updateMessageTypeHierarchy(eventType);
|
//updateMessageTypeHierarchy(eventType);
|
||||||
@ -145,7 +166,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// register the listener to the existing subscriptions
|
// register the listener to the existing subscriptions
|
||||||
for (Subscription sub : subscriptionsByListener){
|
for (Subscription sub : subscriptionsByListener) {
|
||||||
sub.subscribe(listener);
|
sub.subscribe(listener);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -154,12 +175,12 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void addErrorHandler(IPublicationErrorHandler handler) {
|
public final void addErrorHandler(IPublicationErrorHandler handler) {
|
||||||
errorHandlers.add(handler);
|
errorHandlers.add(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
// this method enqueues a message delivery request
|
// this method enqueues a message delivery request
|
||||||
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request){
|
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) {
|
||||||
try {
|
try {
|
||||||
pendingMessages.put(request);
|
pendingMessages.put(request);
|
||||||
return request.markScheduled();
|
return request.markScheduled();
|
||||||
@ -169,7 +190,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
|
|
||||||
// this method enqueues a message delivery request
|
// this method enqueues a message delivery request
|
||||||
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit){
|
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
|
||||||
try {
|
try {
|
||||||
return pendingMessages.offer(request, timeout, unit)
|
return pendingMessages.offer(request, timeout, unit)
|
||||||
? request.markScheduled()
|
? request.markScheduled()
|
||||||
@ -191,8 +212,10 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
|
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
|
||||||
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
|
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
|
||||||
if (subs != null) {
|
if (subs != null) {
|
||||||
for(Subscription sub : subs){
|
for (Subscription sub : subs) {
|
||||||
if(sub.handlesMessageType(messageType))subscriptions.add(sub);
|
if (sub.handlesMessageType(messageType)) {
|
||||||
|
subscriptions.add(sub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -200,7 +223,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// associate a suscription with a message type
|
// associate a suscription with a message type
|
||||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||||
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
|
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
|
||||||
@ -213,10 +235,8 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public void handlePublicationError(PublicationError error) {
|
public void handlePublicationError(PublicationError error) {
|
||||||
for (IPublicationErrorHandler errorHandler : errorHandlers){
|
for (IPublicationErrorHandler errorHandler : errorHandlers) {
|
||||||
errorHandler.handleError(error);
|
errorHandler.handleError(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -227,14 +247,14 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
super.finalize();
|
super.finalize();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shutdown(){
|
private void shutdown() {
|
||||||
for (Thread dispatcher : dispatchers) {
|
for (Thread dispatcher : dispatchers) {
|
||||||
dispatcher.interrupt();
|
dispatcher.interrupt();
|
||||||
}
|
}
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasPendingMessages(){
|
public boolean hasPendingMessages() {
|
||||||
return pendingMessages.size() > 0;
|
return pendingMessages.size() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,12 @@ package net.engio.mbassy.bus;
|
|||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
import net.engio.mbassy.subscription.SubscriptionFactory;
|
import net.engio.mbassy.subscription.SubscriptionFactory;
|
||||||
|
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
|
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
|
||||||
@ -22,7 +27,7 @@ public class BusConfiguration {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public static final BusConfiguration Default(){
|
public static BusConfiguration Default() {
|
||||||
return new BusConfiguration();
|
return new BusConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,7 +7,6 @@ import java.util.concurrent.Executor;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
|
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
|
||||||
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
|
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
|
||||||
* The dispatch mechanism can by controlled for per message handler and message publication.
|
* The dispatch mechanism can by controlled for per message handler and message publication.
|
||||||
@ -41,7 +40,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
|
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
|
||||||
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
|
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener
|
||||||
* after the remove operation completed.
|
* after the remove operation completed.
|
||||||
*
|
* <p/>
|
||||||
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
|
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
|
||||||
* get dispatched to all message handlers that take an instance of List as their parameter
|
* get dispatched to all message handlers that take an instance of List as their parameter
|
||||||
*
|
*
|
||||||
@ -57,14 +56,13 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
*
|
*
|
||||||
* @param listener
|
* @param listener
|
||||||
*/
|
*/
|
||||||
public void subscribe(Object listener);
|
void subscribe(Object listener);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
|
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
|
||||||
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
|
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
|
||||||
* publications that have been published when the message listener was still subscribed).
|
* publications that have been published when the message listener was still subscribed).
|
||||||
*
|
* <p/>
|
||||||
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
|
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
|
||||||
* handlers will not have any effect and is silently ignored.
|
* handlers will not have any effect and is silently ignored.
|
||||||
*
|
*
|
||||||
@ -72,14 +70,13 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
* @return true, if the listener was found and successfully removed
|
* @return true, if the listener was found and successfully removed
|
||||||
* false otherwise
|
* false otherwise
|
||||||
*/
|
*/
|
||||||
public boolean unsubscribe(Object listener);
|
boolean unsubscribe(Object listener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @param message
|
* @param message
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public P post(T message);
|
P post(T message);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
|
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
|
||||||
@ -89,15 +86,14 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
*
|
*
|
||||||
* @param errorHandler
|
* @param errorHandler
|
||||||
*/
|
*/
|
||||||
public void addErrorHandler(IPublicationErrorHandler errorHandler);
|
void addErrorHandler(IPublicationErrorHandler errorHandler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an immutable collection containing all the registered error handlers
|
* Returns an immutable collection containing all the registered error handlers
|
||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
|
Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the executor service that is used to asynchronous message publication.
|
* Get the executor service that is used to asynchronous message publication.
|
||||||
@ -105,54 +101,49 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public Executor getExecutor();
|
Executor getExecutor();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether any asynchronous message publications are pending for being processed
|
* Check whether any asynchronous message publications are pending for being processed
|
||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public boolean hasPendingMessages();
|
boolean hasPendingMessages();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A post command is used as an intermediate object created by a call to the message bus' post method.
|
* A post command is used as an intermediate object created by a call to the message bus' post method.
|
||||||
* It encapsulates the functionality provided by the message bus that created the command.
|
* It encapsulates the functionality provided by the message bus that created the command.
|
||||||
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
|
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public static interface IPostCommand<T>{
|
interface IPostCommand<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication immediately. This call blocks until every matching message handler
|
* Execute the message publication immediately. This call blocks until every matching message handler
|
||||||
* has been invoked.
|
* has been invoked.
|
||||||
*/
|
*/
|
||||||
public void now();
|
void now();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||||
* configured queuing strategy:
|
* configured queuing strategy:
|
||||||
*
|
* <p/>
|
||||||
* If an unbound queuing strategy is used the call returns immediately.
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||||
*
|
*
|
||||||
* @return A message publication that can be used to access information about the state of
|
* @return A message publication that can be used to access information about the state of
|
||||||
*/
|
*/
|
||||||
public MessagePublication asynchronously();
|
MessagePublication asynchronously();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||||
* configured queuing strategy:
|
* configured queuing strategy:
|
||||||
*
|
* <p/>
|
||||||
* If an unbound queuing strategy is used the call returns immediately.
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
* If a bounded queue is used the call will block until the message can be placed in the queue
|
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||||
* or the timeout is reached.
|
* or the timeout is reached.
|
||||||
*
|
*
|
||||||
* @return A message publication that wraps up the publication request
|
* @return A message publication that wraps up the publication request
|
||||||
*/
|
*/
|
||||||
public MessagePublication asynchronously(long timeout, TimeUnit unit);
|
MessagePublication asynchronously(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -29,12 +29,12 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
|
|||||||
// Dead Event
|
// Dead Event
|
||||||
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
||||||
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
|
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
|
||||||
|
} else {
|
||||||
|
return getPublicationFactory().createPublication(this, subscriptions, message);
|
||||||
}
|
}
|
||||||
else return getPublicationFactory().createPublication(this, subscriptions, message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
|
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
|
||||||
* The call blocks until every messageHandler has processed the message.
|
* The call blocks until every messageHandler has processed the message.
|
||||||
|
@ -10,7 +10,7 @@ import java.util.Collection;
|
|||||||
* A message publication is created for each asynchronous message dispatch. It reflects the state
|
* A message publication is created for each asynchronous message dispatch. It reflects the state
|
||||||
* of the corresponding message publication process, i.e. provides information whether the
|
* of the corresponding message publication process, i.e. provides information whether the
|
||||||
* publication was successfully scheduled, is currently running etc.
|
* publication was successfully scheduled, is currently running etc.
|
||||||
*
|
* <p/>
|
||||||
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
|
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
|
||||||
* be used in multiple threads simultaneously .
|
* be used in multiple threads simultaneously .
|
||||||
*
|
*
|
||||||
@ -19,9 +19,9 @@ import java.util.Collection;
|
|||||||
*/
|
*/
|
||||||
public class MessagePublication {
|
public class MessagePublication {
|
||||||
|
|
||||||
public static class Factory{
|
public static class Factory {
|
||||||
|
|
||||||
public MessagePublication createPublication(IMessageBus owningBus, Collection<Subscription> subscriptions, Object message){
|
public MessagePublication createPublication(IMessageBus owningBus, Collection<Subscription> subscriptions, Object message) {
|
||||||
return new MessagePublication(owningBus, subscriptions, message, State.Initial);
|
return new MessagePublication(owningBus, subscriptions, message, State.Initial);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,17 +48,17 @@ public class MessagePublication {
|
|||||||
return subscriptions.add(subscription);
|
return subscriptions.add(subscription);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void execute(){
|
protected void execute() {
|
||||||
state = State.Running;
|
state = State.Running;
|
||||||
for(Subscription sub : subscriptions){
|
for (Subscription sub : subscriptions) {
|
||||||
sub.publish(this, message);
|
sub.publish(this, message);
|
||||||
}
|
}
|
||||||
state = State.Finished;
|
state = State.Finished;
|
||||||
// if the message has not been marked delivered by the dispatcher
|
// if the message has not been marked delivered by the dispatcher
|
||||||
if(!delivered){
|
if (!delivered) {
|
||||||
if(!isFilteredEvent() && !isDeadEvent()){
|
if (!isFilteredEvent() && !isDeadEvent()) {
|
||||||
bus.post(new FilteredMessage(message)).now();
|
bus.post(new FilteredMessage(message)).now();
|
||||||
}else if(!isDeadEvent()){
|
} else if (!isDeadEvent()) {
|
||||||
bus.post(new DeadMessage(message)).now();
|
bus.post(new DeadMessage(message)).now();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,32 +77,33 @@ public class MessagePublication {
|
|||||||
return state.equals(State.Scheduled);
|
return state.equals(State.Scheduled);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markDelivered(){
|
public void markDelivered() {
|
||||||
delivered = true;
|
delivered = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MessagePublication markScheduled(){
|
public MessagePublication markScheduled() {
|
||||||
if(!state.equals(State.Initial))
|
if (!state.equals(State.Initial)) {
|
||||||
return this;
|
return this;
|
||||||
|
}
|
||||||
state = State.Scheduled;
|
state = State.Scheduled;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MessagePublication setError(){
|
public MessagePublication setError() {
|
||||||
state = State.Error;
|
state = State.Error;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDeadEvent(){
|
public boolean isDeadEvent() {
|
||||||
return DeadMessage.class.isAssignableFrom(message.getClass());
|
return DeadMessage.class.isAssignableFrom(message.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFilteredEvent(){
|
public boolean isFilteredEvent() {
|
||||||
return FilteredMessage.class.isAssignableFrom(message.getClass());
|
return FilteredMessage.class.isAssignableFrom(message.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum State{
|
private enum State {
|
||||||
Initial,Scheduled,Running,Finished,Error;
|
Initial, Scheduled, Running, Finished, Error
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -20,27 +20,32 @@ import java.util.WeakHashMap;
|
|||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 2/12/12
|
* Date: 2/12/12
|
||||||
*/
|
*/
|
||||||
public class ConcurrentSet<T> implements Iterable<T>{
|
public class ConcurrentSet<T> implements Iterable<T> {
|
||||||
|
|
||||||
|
// Internal state
|
||||||
|
private final Object lock = new Object();
|
||||||
private WeakHashMap<T, Entry<T>> entries = new WeakHashMap<T, Entry<T>>(); // maintain a map of entries for O(log n) lookup
|
private WeakHashMap<T, Entry<T>> entries = new WeakHashMap<T, Entry<T>>(); // maintain a map of entries for O(log n) lookup
|
||||||
|
|
||||||
private Entry<T> head; // reference to the first element
|
private Entry<T> head; // reference to the first element
|
||||||
|
|
||||||
public ConcurrentSet<T> add(T element) {
|
public ConcurrentSet<T> add(T element) {
|
||||||
if (element == null || entries.containsKey(element)) return this;
|
if (element == null || entries.containsKey(element)) {
|
||||||
synchronized (this) {
|
return this;
|
||||||
|
}
|
||||||
|
synchronized (lock) {
|
||||||
insert(element);
|
insert(element);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean contains(T element){
|
public boolean contains(T element) {
|
||||||
Entry<T> entry = entries.get(element);
|
Entry<T> entry = entries.get(element);
|
||||||
return entry != null && entry.getValue() != null;
|
return entry != null && entry.getValue() != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insert(T element) {
|
private void insert(T element) {
|
||||||
if (entries.containsKey(element)) return;
|
if (entries.containsKey(element)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (head == null) {
|
if (head == null) {
|
||||||
head = new Entry<T>(element);
|
head = new Entry<T>(element);
|
||||||
} else {
|
} else {
|
||||||
@ -49,14 +54,16 @@ public class ConcurrentSet<T> implements Iterable<T>{
|
|||||||
entries.put(element, head);
|
entries.put(element, head);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size(){
|
public int size() {
|
||||||
return entries.size();
|
return entries.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConcurrentSet<T> addAll(Iterable<T> elements) {
|
public ConcurrentSet<T> addAll(Iterable<T> elements) {
|
||||||
synchronized (this) {
|
synchronized (lock) {
|
||||||
for (T element : elements) {
|
for (T element : elements) {
|
||||||
if (element == null || entries.containsKey(element)) return this;
|
if (element == null || entries.containsKey(element)) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
insert(element);
|
insert(element);
|
||||||
}
|
}
|
||||||
@ -65,10 +72,14 @@ public class ConcurrentSet<T> implements Iterable<T>{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean remove(T element) {
|
public boolean remove(T element) {
|
||||||
if (!entries.containsKey(element)) return false;
|
if (!entries.containsKey(element)) {
|
||||||
synchronized (this) {
|
return false;
|
||||||
|
}
|
||||||
|
synchronized (lock) {
|
||||||
Entry<T> listelement = entries.get(element);
|
Entry<T> listelement = entries.get(element);
|
||||||
if(listelement == null)return false; //removed by other thread
|
if (listelement == null) {
|
||||||
|
return false; //removed by other thread
|
||||||
|
}
|
||||||
if (listelement != head) {
|
if (listelement != head) {
|
||||||
listelement.remove();
|
listelement.remove();
|
||||||
} else {
|
} else {
|
||||||
@ -99,7 +110,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public T next() {
|
public T next() {
|
||||||
if (current == null) return null;
|
if (current == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
T value = current.getValue();
|
T value = current.getValue();
|
||||||
if (value == null) { // auto-removal of orphan references
|
if (value == null) { // auto-removal of orphan references
|
||||||
do {
|
do {
|
||||||
@ -113,7 +126,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void remove() {
|
public void remove() {
|
||||||
if (current == null) return;
|
if (current == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
Entry<T> newCurrent = current.next();
|
Entry<T> newCurrent = current.next();
|
||||||
ConcurrentSet.this.remove(current.getValue());
|
ConcurrentSet.this.remove(current.getValue());
|
||||||
current = newCurrent;
|
current = newCurrent;
|
||||||
@ -149,7 +164,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
|
|||||||
public void remove() {
|
public void remove() {
|
||||||
if (predecessor != null) {
|
if (predecessor != null) {
|
||||||
predecessor.next = next;
|
predecessor.next = next;
|
||||||
if(next != null)next.predecessor = predecessor;
|
if (next != null) {
|
||||||
|
next.predecessor = predecessor;
|
||||||
|
}
|
||||||
} else if (next != null) {
|
} else if (next != null) {
|
||||||
next.predecessor = null;
|
next.predecessor = null;
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package net.engio.mbassy.common;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Created with IntelliJ IDEA.
|
* Created with IntelliJ IDEA.
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 10/22/12
|
* Date: 10/22/12
|
||||||
* Time: 9:33 AM
|
* Time: 9:33 AM
|
||||||
@ -9,6 +10,5 @@ package net.engio.mbassy.common;
|
|||||||
*/
|
*/
|
||||||
public interface IPredicate<T> {
|
public interface IPredicate<T> {
|
||||||
|
|
||||||
public boolean apply(T target);
|
boolean apply(T target);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -38,12 +38,11 @@ public class ReflectionUtils {
|
|||||||
* @param subclass
|
* @param subclass
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static Method getOverridingMethod(Method overridingMethod, Class subclass) {
|
public static Method getOverridingMethod(final Method overridingMethod, final Class subclass) {
|
||||||
Class current = subclass;
|
Class current = subclass;
|
||||||
while(!current.equals(overridingMethod.getDeclaringClass())){
|
while (!current.equals(overridingMethod.getDeclaringClass())) {
|
||||||
try {
|
try {
|
||||||
Method overridden = current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes());
|
return current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes());
|
||||||
return overridden;
|
|
||||||
} catch (NoSuchMethodException e) {
|
} catch (NoSuchMethodException e) {
|
||||||
current = current.getSuperclass();
|
current = current.getSuperclass();
|
||||||
}
|
}
|
||||||
@ -51,10 +50,12 @@ public class ReflectionUtils {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Method> withoutOverridenSuperclassMethods(List<Method> allMethods) {
|
public static List<Method> withoutOverridenSuperclassMethods(final List<Method> allMethods) {
|
||||||
List<Method> filtered = new LinkedList<Method>();
|
List<Method> filtered = new LinkedList<Method>();
|
||||||
for (Method method : allMethods) {
|
for (Method method : allMethods) {
|
||||||
if (!containsOverridingMethod(allMethods, method)) filtered.add(method);
|
if (!containsOverridingMethod(allMethods, method)) {
|
||||||
|
filtered.add(method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return filtered;
|
return filtered;
|
||||||
}
|
}
|
||||||
@ -68,9 +69,11 @@ public class ReflectionUtils {
|
|||||||
return superclasses;
|
return superclasses;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean containsOverridingMethod(List<Method> allMethods, Method methodToCheck) {
|
public static boolean containsOverridingMethod(final List<Method> allMethods, final Method methodToCheck) {
|
||||||
for (Method method : allMethods) {
|
for (Method method : allMethods) {
|
||||||
if (isOverriddenBy(methodToCheck, method)) return true;
|
if (isOverriddenBy(methodToCheck, method)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -88,12 +91,9 @@ public class ReflectionUtils {
|
|||||||
Class[] superClassMethodParameters = superclassMethod.getParameterTypes();
|
Class[] superClassMethodParameters = superclassMethod.getParameterTypes();
|
||||||
Class[] subClassMethodParameters = superclassMethod.getParameterTypes();
|
Class[] subClassMethodParameters = superclassMethod.getParameterTypes();
|
||||||
// method must specify the same number of parameters
|
// method must specify the same number of parameters
|
||||||
if(subClassMethodParameters.length != subClassMethodParameters.length){
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
//the parameters must occur in the exact same order
|
//the parameters must occur in the exact same order
|
||||||
for(int i = 0 ; i< subClassMethodParameters.length; i++){
|
for (int i = 0; i < subClassMethodParameters.length; i++) {
|
||||||
if(!superClassMethodParameters[i].equals(subClassMethodParameters[i])){
|
if (!superClassMethodParameters[i].equals(subClassMethodParameters[i])) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,9 @@ public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAw
|
|||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void invoke(final Object listener, final Object message) {
|
public void invoke(final Object listener, final Object message) {
|
||||||
getContext().getOwningBus().getExecutor().execute(new Runnable() {
|
getContext().getOwningBus().getExecutor().execute(new Runnable() {
|
||||||
|
@ -9,7 +9,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
|
|||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 3/1/13
|
* Date: 3/1/13
|
||||||
*/
|
*/
|
||||||
public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher{
|
public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
|
||||||
|
|
||||||
private IMessageDispatcher delegate;
|
private IMessageDispatcher delegate;
|
||||||
|
|
||||||
|
@ -7,13 +7,13 @@ import net.engio.mbassy.subscription.MessageEnvelope;
|
|||||||
/**
|
/**
|
||||||
* The enveloped dispatcher will wrap published messages in an envelope before
|
* The enveloped dispatcher will wrap published messages in an envelope before
|
||||||
* passing them to their configured dispatcher.
|
* passing them to their configured dispatcher.
|
||||||
*
|
* <p/>
|
||||||
* All enveloped message handlers will have this dispatcher in their chain
|
* All enveloped message handlers will have this dispatcher in their chain
|
||||||
*
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 12/12/12
|
* Date: 12/12/12
|
||||||
*/
|
*/
|
||||||
public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher{
|
public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher {
|
||||||
|
|
||||||
|
|
||||||
public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) {
|
public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) {
|
||||||
|
@ -25,10 +25,11 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
|
|||||||
|
|
||||||
if (filter == null) {
|
if (filter == null) {
|
||||||
return true;
|
return true;
|
||||||
|
} else {
|
||||||
|
for (IMessageFilter aFilter : filter) {
|
||||||
|
if (!aFilter.accepts(message, getContext().getHandlerMetadata())) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
for (int i = 0; i < filter.length; i++) {
|
|
||||||
if (!filter[i].accepts(message, getContext().getHandlerMetadata())) return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -37,7 +38,7 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
|
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
|
||||||
if(passesFilter(message)){
|
if (passesFilter(message)) {
|
||||||
getDelegate().dispatch(publication, message, listeners);
|
getDelegate().dispatch(publication, message, listeners);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,5 @@ public interface IHandlerInvocation extends ISubscriptionContextAware {
|
|||||||
* @param listener The listener that will receive the message
|
* @param listener The listener that will receive the message
|
||||||
* @param message The message to be delivered to the listener
|
* @param message The message to be delivered to the listener
|
||||||
*/
|
*/
|
||||||
public void invoke(final Object listener, final Object message);
|
void invoke(Object listener, Object message);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,5 +10,5 @@ import net.engio.mbassy.bus.IMessageBus;
|
|||||||
*/
|
*/
|
||||||
public interface IMessageBusAware {
|
public interface IMessageBusAware {
|
||||||
|
|
||||||
public IMessageBus getBus();
|
IMessageBus getBus();
|
||||||
}
|
}
|
||||||
|
@ -7,10 +7,10 @@ import net.engio.mbassy.common.ConcurrentSet;
|
|||||||
* A message dispatcher provides the functionality to deliver a single message
|
* A message dispatcher provides the functionality to deliver a single message
|
||||||
* to a set of listeners. A message dispatcher uses a message context to access
|
* to a set of listeners. A message dispatcher uses a message context to access
|
||||||
* all information necessary for the message delivery.
|
* all information necessary for the message delivery.
|
||||||
*
|
* <p/>
|
||||||
* The delivery of a single message to a single listener is responsibility of the
|
* The delivery of a single message to a single listener is responsibility of the
|
||||||
* handler invocation object associated with the dispatcher.
|
* handler invocation object associated with the dispatcher.
|
||||||
*
|
* <p/>
|
||||||
* Implementations if IMessageDispatcher are partially designed using decorator pattern
|
* Implementations if IMessageDispatcher are partially designed using decorator pattern
|
||||||
* such that it is possible to compose different message dispatchers into dispatcher chains
|
* such that it is possible to compose different message dispatchers into dispatcher chains
|
||||||
* to achieve more complex dispatch logic.
|
* to achieve more complex dispatch logic.
|
||||||
@ -29,12 +29,14 @@ public interface IMessageDispatcher extends ISubscriptionContextAware {
|
|||||||
* @param message The message that should be delivered to the listeners
|
* @param message The message that should be delivered to the listeners
|
||||||
* @param listeners The listeners that should receive the message
|
* @param listeners The listeners that should receive the message
|
||||||
*/
|
*/
|
||||||
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners);
|
void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the handler invocation that will be used to deliver the message to each
|
* Get the handler invocation that will be used to deliver the
|
||||||
* listener
|
* message to each listener.
|
||||||
* @return
|
*
|
||||||
|
* @return the handler invocation that will be used to deliver the
|
||||||
|
* message to each listener
|
||||||
*/
|
*/
|
||||||
public IHandlerInvocation getInvocation();
|
IHandlerInvocation getInvocation();
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ public interface ISubscriptionContextAware extends IMessageBusAware {
|
|||||||
/**
|
/**
|
||||||
* Get the subscription context associated with this object
|
* Get the subscription context associated with this object
|
||||||
*
|
*
|
||||||
* @return
|
* @return the subscription context associated with this object
|
||||||
*/
|
*/
|
||||||
public SubscriptionContext getContext();
|
SubscriptionContext getContext();
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Standard implementation for direct, unfiltered message delivery.
|
* Standard implementation for direct, unfiltered message delivery.
|
||||||
*
|
* <p/>
|
||||||
* For each message delivery, this dispatcher iterates over the listeners
|
* For each message delivery, this dispatcher iterates over the listeners
|
||||||
* and uses the previously provided handler invocation to deliver the message
|
* and uses the previously provided handler invocation to deliver the message
|
||||||
* to each listener
|
* to each listener
|
||||||
@ -27,7 +27,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
|
|||||||
@Override
|
@Override
|
||||||
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
|
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
|
||||||
publication.markDelivered();
|
publication.markDelivered();
|
||||||
for(Object listener: listeners){
|
for (Object listener : listeners) {
|
||||||
getInvocation().invoke(listener, message);
|
getInvocation().invoke(listener, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -36,5 +36,4 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
|
|||||||
public IHandlerInvocation getInvocation() {
|
public IHandlerInvocation getInvocation() {
|
||||||
return invocation;
|
return invocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -15,42 +15,39 @@ import java.util.Collection;
|
|||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 11/23/12
|
* Date: 11/23/12
|
||||||
*/
|
*/
|
||||||
public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation{
|
public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
|
||||||
|
|
||||||
public ReflectiveHandlerInvocation(SubscriptionContext context) {
|
public ReflectiveHandlerInvocation(SubscriptionContext context) {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handlePublicationError(PublicationError error){
|
protected void handlePublicationError(PublicationError error) {
|
||||||
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
|
Collection<IPublicationErrorHandler> handlers = getContext().getOwningBus().getRegisteredErrorHandlers();
|
||||||
for(IPublicationErrorHandler handler : handlers){
|
for (IPublicationErrorHandler handler : handlers) {
|
||||||
handler.handleError(error);
|
handler.handleError(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void invokeHandler(final Object message, final Object listener, Method handler){
|
protected void invokeHandler(final Object message, final Object listener, Method handler) {
|
||||||
try {
|
try {
|
||||||
handler.invoke(listener, message);
|
handler.invoke(listener, message);
|
||||||
}catch(IllegalAccessException e){
|
} catch (IllegalAccessException e) {
|
||||||
handlePublicationError(
|
handlePublicationError(
|
||||||
new PublicationError(e, "Error during messageHandler notification. " +
|
new PublicationError(e, "Error during messageHandler notification. " +
|
||||||
"The class or method is not accessible",
|
"The class or method is not accessible",
|
||||||
handler, listener, message));
|
handler, listener, message));
|
||||||
}
|
} catch (IllegalArgumentException e) {
|
||||||
catch(IllegalArgumentException e){
|
|
||||||
handlePublicationError(
|
handlePublicationError(
|
||||||
new PublicationError(e, "Error during messageHandler notification. " +
|
new PublicationError(e, "Error during messageHandler notification. " +
|
||||||
"Wrong arguments passed to method. Was: " + message.getClass()
|
"Wrong arguments passed to method. Was: " + message.getClass()
|
||||||
+ "Expected: " + handler.getParameterTypes()[0],
|
+ "Expected: " + handler.getParameterTypes()[0],
|
||||||
handler, listener, message));
|
handler, listener, message));
|
||||||
}
|
} catch (InvocationTargetException e) {
|
||||||
catch (InvocationTargetException e) {
|
|
||||||
handlePublicationError(
|
handlePublicationError(
|
||||||
new PublicationError(e, "Error during messageHandler notification. " +
|
new PublicationError(e, "Error during messageHandler notification. " +
|
||||||
"Message handler threw exception",
|
"Message handler threw exception",
|
||||||
handler, listener, message));
|
handler, listener, message));
|
||||||
}
|
} catch (Throwable e) {
|
||||||
catch (Throwable e) {
|
|
||||||
handlePublicationError(
|
handlePublicationError(
|
||||||
new PublicationError(e, "Error during messageHandler notification. " +
|
new PublicationError(e, "Error during messageHandler notification. " +
|
||||||
"Unexpected exception",
|
"Unexpected exception",
|
||||||
@ -58,6 +55,9 @@ public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAwar
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void invoke(final Object listener, final Object message) {
|
public void invoke(final Object listener, final Object message) {
|
||||||
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());
|
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
package net.engio.mbassy.listener;
|
package net.engio.mbassy.listener;
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Inherited;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure a handler to receive an enveloped message as a wrapper around the source
|
* Configure a handler to receive an enveloped message as a wrapper around the source
|
||||||
|
@ -10,8 +10,9 @@ import java.lang.annotation.Target;
|
|||||||
* It references a class that implements the IMessageFilter interface.
|
* It references a class that implements the IMessageFilter interface.
|
||||||
* The filter will be used to check whether a message should be delivered
|
* The filter will be used to check whether a message should be delivered
|
||||||
* to the listener or not.
|
* to the listener or not.
|
||||||
*
|
|
||||||
* <p/>
|
* <p/>
|
||||||
|
* <p/>
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 2/14/12
|
* Date: 2/14/12
|
||||||
*/
|
*/
|
||||||
@ -22,6 +23,7 @@ public @interface Filter {
|
|||||||
/**
|
/**
|
||||||
* The class that implements the filter.
|
* The class that implements the filter.
|
||||||
* Note: A filter always needs to provide a non-arg constructor
|
* Note: A filter always needs to provide a non-arg constructor
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
Class<? extends IMessageFilter> value();
|
Class<? extends IMessageFilter> value();
|
||||||
|
@ -30,8 +30,10 @@ public class Filters {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
|
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
|
||||||
for(Class handledMessage : metadata.getHandledMessages()){
|
for (Class handledMessage : metadata.getHandledMessages()) {
|
||||||
if(handledMessage.equals(event.getClass()))return true;
|
if (handledMessage.equals(event.getClass())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
package net.engio.mbassy.listener;
|
package net.engio.mbassy.listener;
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Inherited;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark any method of any object(=listener) as a message handler and configure the handler
|
* Mark any method of any object(=listener) as a message handler and configure the handler
|
||||||
@ -23,7 +27,6 @@ public @interface Handler {
|
|||||||
/**
|
/**
|
||||||
* Define the mode in which a message is delivered to each listener. Listeners can be notified
|
* Define the mode in which a message is delivered to each listener. Listeners can be notified
|
||||||
* sequentially or concurrently.
|
* sequentially or concurrently.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
Mode delivery() default Mode.Sequential;
|
Mode delivery() default Mode.Sequential;
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ package net.engio.mbassy.listener;
|
|||||||
/**
|
/**
|
||||||
* Message filters can be used to prevent certain messages to be delivered to a specific listener.
|
* Message filters can be used to prevent certain messages to be delivered to a specific listener.
|
||||||
* If a filter is used the message will only be delivered if it passes the filter(s)
|
* If a filter is used the message will only be delivered if it passes the filter(s)
|
||||||
*
|
* <p/>
|
||||||
* NOTE: A message filter must provide either a no-arg constructor.
|
* NOTE: A message filter must provide either a no-arg constructor.
|
||||||
*
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
@ -14,10 +14,8 @@ public interface IMessageFilter {
|
|||||||
/**
|
/**
|
||||||
* Evaluate the message to ensure that it matches the handler configuration
|
* Evaluate the message to ensure that it matches the handler configuration
|
||||||
*
|
*
|
||||||
*
|
|
||||||
* @param message the message to be delivered
|
* @param message the message to be delivered
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public boolean accepts(Object message, MessageHandlerMetadata metadata);
|
boolean accepts(Object message, MessageHandlerMetadata metadata);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
package net.engio.mbassy.listener;
|
package net.engio.mbassy.listener;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
*
|
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 11/14/12
|
* Date: 11/14/12
|
||||||
*/
|
*/
|
||||||
@ -34,26 +33,24 @@ public class MessageHandlerMetadata {
|
|||||||
this.isAsynchronous = handlerConfig.delivery().equals(Mode.Concurrent);
|
this.isAsynchronous = handlerConfig.delivery().equals(Mode.Concurrent);
|
||||||
this.envelope = handler.getAnnotation(Enveloped.class);
|
this.envelope = handler.getAnnotation(Enveloped.class);
|
||||||
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
|
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
|
||||||
if(this.envelope != null){
|
if (this.envelope != null) {
|
||||||
for(Class messageType : envelope.messages())
|
Collections.addAll(handledMessages, envelope.messages());
|
||||||
handledMessages.add(messageType);
|
} else {
|
||||||
}
|
|
||||||
else{
|
|
||||||
handledMessages.add(handler.getParameterTypes()[0]);
|
handledMessages.add(handler.getParameterTypes()[0]);
|
||||||
}
|
}
|
||||||
this.handler.setAccessible(true);
|
this.handler.setAccessible(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean isAsynchronous(){
|
public boolean isAsynchronous() {
|
||||||
return isAsynchronous;
|
return isAsynchronous;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFiltered(){
|
public boolean isFiltered() {
|
||||||
return filter != null && filter.length > 0;
|
return filter != null && filter.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPriority(){
|
public int getPriority() {
|
||||||
return handlerConfig.priority();
|
return handlerConfig.priority();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,7 +62,7 @@ public class MessageHandlerMetadata {
|
|||||||
return filter;
|
return filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Class<?>> getHandledMessages(){
|
public List<Class<?>> getHandledMessages() {
|
||||||
return handledMessages;
|
return handledMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,15 +70,19 @@ public class MessageHandlerMetadata {
|
|||||||
return envelope != null;
|
return envelope != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean handlesMessage(Class<?> messageType){
|
public boolean handlesMessage(Class<?> messageType) {
|
||||||
for(Class<?> handledMessage : handledMessages){
|
for (Class<?> handledMessage : handledMessages) {
|
||||||
if(handledMessage.equals(messageType))return true;
|
if (handledMessage.equals(messageType)) {
|
||||||
if(handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) return true;
|
return true;
|
||||||
|
}
|
||||||
|
if (handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean acceptsSubtypes(){
|
public boolean acceptsSubtypes() {
|
||||||
return acceptsSubtypes;
|
return acceptsSubtypes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,14 +9,13 @@ import java.util.List;
|
|||||||
* Provides information about the message listeners of a specific class. Each message handler
|
* Provides information about the message listeners of a specific class. Each message handler
|
||||||
* defined by the target class is represented as a single entity.
|
* defined by the target class is represented as a single entity.
|
||||||
*
|
*
|
||||||
*
|
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 12/16/12
|
* Date: 12/16/12
|
||||||
*/
|
*/
|
||||||
public class MessageListenerMetadata<T> {
|
public class MessageListenerMetadata<T> {
|
||||||
|
|
||||||
|
|
||||||
public static final IPredicate<MessageHandlerMetadata> ForMessage(final Class<?> messageType){
|
public static IPredicate<MessageHandlerMetadata> ForMessage(final Class<?> messageType) {
|
||||||
return new IPredicate<MessageHandlerMetadata>() {
|
return new IPredicate<MessageHandlerMetadata>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(MessageHandlerMetadata target) {
|
public boolean apply(MessageHandlerMetadata target) {
|
||||||
@ -35,19 +34,21 @@ public class MessageListenerMetadata<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter){
|
public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter) {
|
||||||
List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>();
|
List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>();
|
||||||
for(MessageHandlerMetadata handler : handlers){
|
for (MessageHandlerMetadata handler : handlers) {
|
||||||
if(filter.apply(handler))matching.add(handler);
|
if (filter.apply(handler)) {
|
||||||
|
matching.add(handler);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return matching;
|
return matching;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean handles(Class<?> messageType){
|
public boolean handles(Class<?> messageType) {
|
||||||
return !getHandlers(ForMessage(messageType)).isEmpty();
|
return !getHandlers(ForMessage(messageType)).isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Class<T> getListerDefinition(){
|
public Class<T> getListerDefinition() {
|
||||||
return listenerDefinition;
|
return listenerDefinition;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,12 @@ import net.engio.mbassy.common.ReflectionUtils;
|
|||||||
import net.engio.mbassy.subscription.MessageEnvelope;
|
import net.engio.mbassy.subscription.MessageEnvelope;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.*;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* The meta data reader is responsible for parsing and validating message handler configurations.
|
* The meta data reader is responsible for parsing and validating message handler configurations.
|
||||||
*
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
@ -28,18 +30,19 @@ public class MetadataReader {
|
|||||||
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
|
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
|
||||||
|
|
||||||
// retrieve all instances of filters associated with the given subscription
|
// retrieve all instances of filters associated with the given subscription
|
||||||
private IMessageFilter[] getFilter(Handler subscription){
|
private IMessageFilter[] getFilter(Handler subscription) {
|
||||||
if (subscription.filters().length == 0) return null;
|
if (subscription.filters().length == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
|
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (Filter filterDef : subscription.filters()) {
|
for (Filter filterDef : subscription.filters()) {
|
||||||
IMessageFilter filter = filterCache.get(filterDef.value());
|
IMessageFilter filter = filterCache.get(filterDef.value());
|
||||||
if (filter == null) {
|
if (filter == null) {
|
||||||
try{
|
try {
|
||||||
filter = filterDef.value().newInstance();
|
filter = filterDef.value().newInstance();
|
||||||
filterCache.put(filterDef.value(), filter);
|
filterCache.put(filterDef.value(), filter);
|
||||||
}
|
} catch (Exception e) {
|
||||||
catch (Exception e){
|
|
||||||
throw new RuntimeException(e);// propagate as runtime exception
|
throw new RuntimeException(e);// propagate as runtime exception
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +54,7 @@ public class MetadataReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler){
|
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) {
|
||||||
Handler config = messageHandler.getAnnotation(Handler.class);
|
Handler config = messageHandler.getAnnotation(Handler.class);
|
||||||
return new MessageHandlerMetadata(messageHandler, getFilter(config), config);
|
return new MessageHandlerMetadata(messageHandler, getFilter(config), config);
|
||||||
}
|
}
|
||||||
@ -63,8 +66,8 @@ public class MetadataReader {
|
|||||||
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
||||||
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
|
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
|
||||||
List<Method> bottomMostHandlers = new LinkedList<Method>();
|
List<Method> bottomMostHandlers = new LinkedList<Method>();
|
||||||
for(Method handler : allHandlers){
|
for (Method handler : allHandlers) {
|
||||||
if(!ReflectionUtils.containsOverridingMethod(allHandlers, handler)){
|
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
|
||||||
bottomMostHandlers.add(handler);
|
bottomMostHandlers.add(handler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -73,9 +76,11 @@ public class MetadataReader {
|
|||||||
List<MessageHandlerMetadata> filteredHandlers = new LinkedList<MessageHandlerMetadata>();
|
List<MessageHandlerMetadata> filteredHandlers = new LinkedList<MessageHandlerMetadata>();
|
||||||
// for each handler there will be no overriding method that specifies @Handler annotation
|
// for each handler there will be no overriding method that specifies @Handler annotation
|
||||||
// but an overriding method does inherit the listener configuration of the overwritten method
|
// but an overriding method does inherit the listener configuration of the overwritten method
|
||||||
for(Method handler : bottomMostHandlers){
|
for (Method handler : bottomMostHandlers) {
|
||||||
Handler handle = handler.getAnnotation(Handler.class);
|
Handler handle = handler.getAnnotation(Handler.class);
|
||||||
if(!handle.enabled() || !isValidMessageHandler(handler)) continue; // disabled or invalid listeners are ignored
|
if (!handle.enabled() || !isValidMessageHandler(handler)) {
|
||||||
|
continue; // disabled or invalid listeners are ignored
|
||||||
|
}
|
||||||
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
|
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
|
||||||
// if a handler is overwritten it inherits the configuration of its parent method
|
// if a handler is overwritten it inherits the configuration of its parent method
|
||||||
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
|
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
|
||||||
@ -92,9 +97,8 @@ public class MetadataReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private boolean isValidMessageHandler(Method handler) {
|
private boolean isValidMessageHandler(Method handler) {
|
||||||
if(handler == null || handler.getAnnotation(Handler.class) == null){
|
if (handler == null || handler.getAnnotation(Handler.class) == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (handler.getParameterTypes().length != 1) {
|
if (handler.getParameterTypes().length != 1) {
|
||||||
@ -104,11 +108,11 @@ public class MetadataReader {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Enveloped envelope = handler.getAnnotation(Enveloped.class);
|
Enveloped envelope = handler.getAnnotation(Enveloped.class);
|
||||||
if(envelope != null && !MessageEnvelope.class.isAssignableFrom(handler.getParameterTypes()[0])){
|
if (envelope != null && !MessageEnvelope.class.isAssignableFrom(handler.getParameterTypes()[0])) {
|
||||||
System.out.println("Message envelope configured but no subclass of MessageEnvelope found as parameter");
|
System.out.println("Message envelope configured but no subclass of MessageEnvelope found as parameter");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if(envelope != null && envelope.messages().length == 0){
|
if (envelope != null && envelope.messages().length == 0) {
|
||||||
System.out.println("Message envelope configured but message types defined for handler");
|
System.out.println("Message envelope configured but message types defined for handler");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package net.engio.mbassy.listener;
|
package net.engio.mbassy.listener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created with IntelliJ IDEA.
|
* Created with IntelliJ IDEA.
|
||||||
* @author bennidi
|
*
|
||||||
* Date: 11/16/12
|
* @author bennidi
|
||||||
* Time: 10:01 AM
|
* Date: 11/16/12
|
||||||
* To change this template use File | Settings | File Templates.
|
* Time: 10:01 AM
|
||||||
*/
|
* To change this template use File | Settings | File Templates.
|
||||||
|
*/
|
||||||
public enum Mode {
|
public enum Mode {
|
||||||
Sequential, Concurrent
|
Sequential, Concurrent
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import net.engio.mbassy.dispatch.ISubscriptionContextAware;
|
|||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 3/1/13
|
* Date: 3/1/13
|
||||||
*/
|
*/
|
||||||
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware{
|
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware {
|
||||||
|
|
||||||
private SubscriptionContext context;
|
private SubscriptionContext context;
|
||||||
|
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
package net.engio.mbassy.subscription;
|
package net.engio.mbassy.subscription;
|
||||||
|
|
||||||
import java.sql.Timestamp;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A message envelope is used to wrap messages of arbitrary type such that a handler
|
* A message envelope is used to wrap messages of arbitrary type such that a handler
|
||||||
* my receive messages of different types.
|
* my receive messages of different types.
|
||||||
@ -11,17 +9,14 @@ import java.sql.Timestamp;
|
|||||||
*/
|
*/
|
||||||
public class MessageEnvelope {
|
public class MessageEnvelope {
|
||||||
|
|
||||||
private Timestamp tsCreated = new Timestamp(System.currentTimeMillis());
|
// Internal state
|
||||||
|
|
||||||
private Object message;
|
private Object message;
|
||||||
|
|
||||||
|
|
||||||
public MessageEnvelope(Object message) {
|
public MessageEnvelope(Object message) {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T> T getMessage() {
|
||||||
public <T> T getMessage(){
|
return (T) message;
|
||||||
return (T)message;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
package net.engio.mbassy.subscription;
|
package net.engio.mbassy.subscription;
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import net.engio.mbassy.bus.MessagePublication;
|
import net.engio.mbassy.bus.MessagePublication;
|
||||||
import net.engio.mbassy.common.ConcurrentSet;
|
import net.engio.mbassy.common.ConcurrentSet;
|
||||||
import net.engio.mbassy.dispatch.IMessageDispatcher;
|
import net.engio.mbassy.dispatch.IMessageDispatcher;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A subscription is a thread safe container for objects that contain message handlers
|
* A subscription is a thread safe container for objects that contain message handlers
|
||||||
*/
|
*/
|
||||||
@ -26,16 +26,16 @@ public class Subscription {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean handlesMessageType(Class<?> messageType){
|
public boolean handlesMessageType(Class<?> messageType) {
|
||||||
return context.getHandlerMetadata().handlesMessage(messageType);
|
return context.getHandlerMetadata().handlesMessage(messageType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void publish(MessagePublication publication, Object message){
|
public void publish(MessagePublication publication, Object message) {
|
||||||
dispatcher.dispatch(publication, message, listeners);
|
dispatcher.dispatch(publication, message, listeners);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPriority(){
|
public int getPriority() {
|
||||||
return context.getHandlerMetadata().getPriority();
|
return context.getHandlerMetadata().getPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +49,7 @@ public class Subscription {
|
|||||||
return listeners.remove(existingListener);
|
return listeners.remove(existingListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size(){
|
public int size() {
|
||||||
return listeners.size();
|
return listeners.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ public class Subscription {
|
|||||||
@Override
|
@Override
|
||||||
public int compare(Subscription o1, Subscription o2) {
|
public int compare(Subscription o1, Subscription o2) {
|
||||||
int result = o1.getPriority() - o2.getPriority();
|
int result = o1.getPriority() - o2.getPriority();
|
||||||
return result == 0 ? o1.id.compareTo(o2.id): result;
|
return result == 0 ? o1.id.compareTo(o2.id) : result;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@ public class SubscriptionContext {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a reference to the message bus this context belongs to
|
* Get a reference to the message bus this context belongs to
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public IMessageBus getOwningBus() {
|
public IMessageBus getOwningBus() {
|
||||||
@ -35,6 +36,7 @@ public class SubscriptionContext {
|
|||||||
/**
|
/**
|
||||||
* Get the meta data that specifies the characteristics of the message handler
|
* Get the meta data that specifies the characteristics of the message handler
|
||||||
* that is associated with this context
|
* that is associated with this context
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public MessageHandlerMetadata getHandlerMetadata() {
|
public MessageHandlerMetadata getHandlerMetadata() {
|
||||||
|
@ -1,9 +1,16 @@
|
|||||||
package net.engio.mbassy.subscription;
|
package net.engio.mbassy.subscription;
|
||||||
|
|
||||||
import net.engio.mbassy.dispatch.*;
|
import net.engio.mbassy.dispatch.AsynchronousHandlerInvocation;
|
||||||
|
import net.engio.mbassy.dispatch.EnvelopedMessageDispatcher;
|
||||||
|
import net.engio.mbassy.dispatch.FilteredMessageDispatcher;
|
||||||
|
import net.engio.mbassy.dispatch.IHandlerInvocation;
|
||||||
|
import net.engio.mbassy.dispatch.IMessageDispatcher;
|
||||||
|
import net.engio.mbassy.dispatch.MessageDispatcher;
|
||||||
|
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created with IntelliJ IDEA.
|
* Created with IntelliJ IDEA.
|
||||||
|
*
|
||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 11/16/12
|
* Date: 11/16/12
|
||||||
* Time: 10:39 AM
|
* Time: 10:39 AM
|
||||||
@ -11,26 +18,26 @@ import net.engio.mbassy.dispatch.*;
|
|||||||
*/
|
*/
|
||||||
public class SubscriptionFactory {
|
public class SubscriptionFactory {
|
||||||
|
|
||||||
public Subscription createSubscription(SubscriptionContext context){
|
public Subscription createSubscription(SubscriptionContext context) {
|
||||||
IHandlerInvocation invocation = buildInvocationForHandler(context);
|
IHandlerInvocation invocation = buildInvocationForHandler(context);
|
||||||
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
|
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
|
||||||
return new Subscription(context, dispatcher);
|
return new Subscription(context, dispatcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context){
|
protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context) {
|
||||||
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context);
|
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context);
|
||||||
if(context.getHandlerMetadata().isAsynchronous()){
|
if (context.getHandlerMetadata().isAsynchronous()) {
|
||||||
invocation = new AsynchronousHandlerInvocation(invocation);
|
invocation = new AsynchronousHandlerInvocation(invocation);
|
||||||
}
|
}
|
||||||
return invocation;
|
return invocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation){
|
protected IMessageDispatcher buildDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
|
||||||
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
|
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
|
||||||
if(context.getHandlerMetadata().isEnveloped()){
|
if (context.getHandlerMetadata().isEnveloped()) {
|
||||||
dispatcher = new EnvelopedMessageDispatcher(dispatcher);
|
dispatcher = new EnvelopedMessageDispatcher(dispatcher);
|
||||||
}
|
}
|
||||||
if(context.getHandlerMetadata().isFiltered()){
|
if (context.getHandlerMetadata().isFiltered()) {
|
||||||
dispatcher = new FilteredMessageDispatcher(dispatcher);
|
dispatcher = new FilteredMessageDispatcher(dispatcher);
|
||||||
}
|
}
|
||||||
return dispatcher;
|
return dispatcher;
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package net.engio.mbassy;
|
package net.engio.mbassy;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import net.engio.mbassy.common.ConcurrentExecutor;
|
import net.engio.mbassy.common.ConcurrentExecutor;
|
||||||
import net.engio.mbassy.common.ConcurrentSet;
|
import net.engio.mbassy.common.ConcurrentSet;
|
||||||
@ -24,16 +25,18 @@ import java.util.Random;
|
|||||||
*/
|
*/
|
||||||
public class ConcurrentSetTest extends UnitTest {
|
public class ConcurrentSetTest extends UnitTest {
|
||||||
|
|
||||||
|
// Shared state
|
||||||
private int numberOfElements = 100000;
|
private int numberOfElements = 100000;
|
||||||
|
|
||||||
private int numberOfThreads = 50;
|
private int numberOfThreads = 50;
|
||||||
|
|
||||||
|
@Ignore("Currently fails when building as a suite with JDK 1.7.0_15 and Maven 3.0.5 on a Mac")
|
||||||
@Test
|
@Test
|
||||||
public void testIteratorCleanup() {
|
public void testIteratorCleanup() {
|
||||||
|
|
||||||
|
// Assemble
|
||||||
final HashSet<Object> persistingCandidates = new HashSet<Object>();
|
final HashSet<Object> persistingCandidates = new HashSet<Object>();
|
||||||
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
|
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
|
||||||
Random rand = new Random();
|
final Random rand = new Random();
|
||||||
|
|
||||||
for (int i = 0; i < numberOfElements; i++) {
|
for (int i = 0; i < numberOfElements; i++) {
|
||||||
Object candidate = new Object();
|
Object candidate = new Object();
|
||||||
@ -44,7 +47,8 @@ public class ConcurrentSetTest extends UnitTest {
|
|||||||
testSet.add(candidate);
|
testSet.add(candidate);
|
||||||
}
|
}
|
||||||
|
|
||||||
// this will remove all objects that have not been inserted into the set of persisting candidates
|
// Remove/Garbage collect all objects that have not
|
||||||
|
// been inserted into the set of persisting candidates.
|
||||||
runGC();
|
runGC();
|
||||||
|
|
||||||
ConcurrentExecutor.runConcurrent(new Runnable() {
|
ConcurrentExecutor.runConcurrent(new Runnable() {
|
||||||
@ -62,8 +66,6 @@ public class ConcurrentSetTest extends UnitTest {
|
|||||||
for (Object test : testSet) {
|
for (Object test : testSet) {
|
||||||
assertTrue(persistingCandidates.contains(test));
|
assertTrue(persistingCandidates.contains(test));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -100,8 +102,6 @@ public class ConcurrentSetTest extends UnitTest {
|
|||||||
for (Object uniqueObject : distinct) {
|
for (Object uniqueObject : distinct) {
|
||||||
assertTrue(testSet.contains(uniqueObject));
|
assertTrue(testSet.contains(uniqueObject));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1,14 +1,16 @@
|
|||||||
package net.engio.mbassy.bus;
|
package net.engio.mbassy.bus;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.BusConfiguration;
|
|
||||||
import net.engio.mbassy.bus.MBassador;
|
|
||||||
import net.engio.mbassy.common.MessageBusTest;
|
import net.engio.mbassy.common.MessageBusTest;
|
||||||
import net.engio.mbassy.events.SubTestMessage;
|
|
||||||
import org.junit.Test;
|
|
||||||
import net.engio.mbassy.common.TestUtil;
|
import net.engio.mbassy.common.TestUtil;
|
||||||
|
import net.engio.mbassy.events.SubTestMessage;
|
||||||
import net.engio.mbassy.events.TestMessage;
|
import net.engio.mbassy.events.TestMessage;
|
||||||
import net.engio.mbassy.listeners.*;
|
import net.engio.mbassy.listeners.EventingTestBean;
|
||||||
|
import net.engio.mbassy.listeners.EventingTestBean2;
|
||||||
|
import net.engio.mbassy.listeners.EventingTestBean3;
|
||||||
|
import net.engio.mbassy.listeners.ListenerFactory;
|
||||||
|
import net.engio.mbassy.listeners.NonListeningBean;
|
||||||
import net.engio.mbassy.subscription.Subscription;
|
import net.engio.mbassy.subscription.Subscription;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
package net.engio.mbassy.common;
|
package net.engio.mbassy.common;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import net.engio.mbassy.*;
|
import net.engio.mbassy.IPublicationErrorHandler;
|
||||||
|
import net.engio.mbassy.PublicationError;
|
||||||
import net.engio.mbassy.bus.BusConfiguration;
|
import net.engio.mbassy.bus.BusConfiguration;
|
||||||
import net.engio.mbassy.bus.MBassador;
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
|
||||||
@ -12,7 +13,7 @@ import net.engio.mbassy.bus.MBassador;
|
|||||||
* @author bennidi
|
* @author bennidi
|
||||||
* Date: 3/2/13
|
* Date: 3/2/13
|
||||||
*/
|
*/
|
||||||
public class MessageBusTest extends UnitTest{
|
public class MessageBusTest extends UnitTest {
|
||||||
|
|
||||||
private static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
|
private static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
|
||||||
@Override
|
@Override
|
||||||
@ -21,11 +22,9 @@ public class MessageBusTest extends UnitTest{
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public MBassador getBus(BusConfiguration configuration){
|
public MBassador getBus(BusConfiguration configuration) {
|
||||||
MBassador bus = new MBassador(configuration);
|
MBassador bus = new MBassador(configuration);
|
||||||
bus.addErrorHandler(TestFailingHandler);
|
bus.addErrorHandler(TestFailingHandler);
|
||||||
return bus;
|
return bus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,14 @@ import java.lang.ref.WeakReference;
|
|||||||
*/
|
*/
|
||||||
public class UnitTest {
|
public class UnitTest {
|
||||||
|
|
||||||
|
// Internal state
|
||||||
|
private Runtime runtime = Runtime.getRuntime();
|
||||||
|
|
||||||
public void pause(long ms) {
|
public void pause(long ms) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(ms);
|
Thread.sleep(ms);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,7 +32,7 @@ public class UnitTest {
|
|||||||
public void runGC() {
|
public void runGC() {
|
||||||
WeakReference ref = new WeakReference<Object>(new Object());
|
WeakReference ref = new WeakReference<Object>(new Object());
|
||||||
while(ref.get() != null) {
|
while(ref.get() != null) {
|
||||||
System.gc();
|
runtime.gc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user