Set an ignore comment on the failing testcase. Ran PMD on the code to clean up some unused modifiers.

This commit is contained in:
Lennart Jörelid 2013-03-23 10:16:12 +01:00
parent 295dfd3ec6
commit cb949a219e
39 changed files with 458 additions and 318 deletions

4
.gitignore vendored
View File

@ -1,5 +1,9 @@
# idea project settings # # idea project settings #
*.iml *.iml
*.ipr
*.iws
.idea/*
.idea
# Package Files # # Package Files #
*.war *.war

37
pom.xml
View File

@ -23,7 +23,6 @@
weak-references, weak-references,
message filtering, message filtering,
ordering of message handlers etc. ordering of message handlers etc.
</description> </description>
<url>https://github.com/bennidi/mbassador</url> <url>https://github.com/bennidi/mbassador</url>
@ -50,6 +49,10 @@
</developers> </developers>
<properties> <properties>
<nazgul-codestyle.version>2.0.1</nazgul-codestyle.version>
<jdk.version>1.6</jdk.version>
<pmd.plugin.version>3.0.1</pmd.plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version> <project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/mvn-local-repo</github.url> <github.url>file://${project.basedir}/mvn-local-repo</github.url>
@ -77,6 +80,38 @@
<build> <build>
<plugins> <plugins>
<!-- plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>${pmd.plugin.version}</version>
<configuration>
<excludeRoots>
<excludeRoot>src/main/generated</excludeRoot>
<excludeRoot>src/test</excludeRoot>
</excludeRoots>
<rulesets>
<ruleset>/codestyle/pmd-rules.xml</ruleset>
</rulesets>
<targetJdk>${jdk.version}</targetJdk>
<sourceEncoding>${project.build.sourceEncoding}</sourceEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
<goal>cpd-check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>se.jguru.nazgul.tools.codestyle</groupId>
<artifactId>nazgul-codestyle</artifactId>
<version>${nazgul-codestyle.version}</version>
</dependency>
</dependencies>
</plugin -->
<plugin> <plugin>
<groupId>org.apache.felix</groupId> <groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId> <artifactId>maven-bundle-plugin</artifactId>

View File

@ -1,32 +1,44 @@
package net.engio.mbassy; package net.engio.mbassy;
/** /**
* Publication error handlers are provided with a publication error every time an error occurs during message publication. * Publication error handlers are provided with a publication error every time an
* A handler might fail with an exception, not be accessible because of the presence of a security manager * error occurs during message publication.
* or other reasons might lead to failures during the message publication process. * A handler might fail with an exception, not be accessible because of the presence
* * of a security manager or other reasons might lead to failures during the message publication process.
* <p/> * <p/>
*
* @author bennidi * @author bennidi
* Date: 2/22/12 * Date: 2/22/12
*/ */
@SuppressWarnings("PMD.UnusedModifier")
public interface IPublicationErrorHandler { public interface IPublicationErrorHandler {
/** /**
* Handle the given publication error. * Handle the given publication error.
* *
* @param error * @param error The PublicationError to handle.
*/ */
public void handleError(PublicationError error); void handleError(PublicationError error);
// This is the default error handler it will simply log to standard out and /**
// print stack trace if available * The default error handler will simply log to standard out and
* print the stack trace if available.
*/
static final class ConsoleLogger implements IPublicationErrorHandler { static final class ConsoleLogger implements IPublicationErrorHandler {
@Override
public void handleError(PublicationError error) {
System.out.println(error);
if (error.getCause() != null) error.getCause().printStackTrace();
}
}
; /**
* {@inheritDoc}
*/
@Override
public void handleError(final PublicationError error) {
// Printout the error itself
System.out.println(error);
// Printout the stacktrace from the cause.
if (error.getCause() != null) {
error.getCause().printStackTrace();
}
}
}
} }

View File

@ -3,27 +3,39 @@ package net.engio.mbassy;
import java.lang.reflect.Method; import java.lang.reflect.Method;
/** /**
* Publication errors are created when object publication fails for some reason and contain details * Publication errors are created when object publication fails
* as to the cause and location where they occured. * for some reason and contain details as to the cause and location
* where they occurred.
* <p/> * <p/>
*
* @author bennidi * @author bennidi
* Date: 2/22/12 * Date: 2/22/12
* Time: 4:59 PM * Time: 4:59 PM
*/ */
public class PublicationError { public class PublicationError {
// Internal state
private Throwable cause; private Throwable cause;
private String message; private String message;
private Method listener; private Method listener;
private Object listeningObject; private Object listeningObject;
private Object publishedObject; private Object publishedObject;
/**
* Compound constructor, creating a PublicationError from the supplied objects.
*
* @param cause The Throwable giving rise to this PublicationError.
* @param message The message to send.
* @param listener The method where the error was created.
* @param listeningObject The object in which the PublicationError was generated.
* @param publishedObject The published object which gave rise to the error.
*/
public PublicationError(final Throwable cause,
final String message,
final Method listener,
final Object listeningObject,
final Object publishedObject) {
public PublicationError(Throwable cause, String message, Method listener, Object listeningObject, Object publishedObject) {
this.cause = cause; this.cause = cause;
this.message = message; this.message = message;
this.listener = listener; this.listener = listener;
@ -31,14 +43,26 @@ public class PublicationError {
this.publishedObject = publishedObject; this.publishedObject = publishedObject;
} }
/**
* Default constructor.
*/
public PublicationError() { public PublicationError() {
super(); super();
} }
/**
* @return The Throwable giving rise to this PublicationError.
*/
public Throwable getCause() { public Throwable getCause() {
return cause; return cause;
} }
/**
* Assigns the cause of this PublicationError.
*
* @param cause A Throwable which gave rise to this PublicationError.
* @return This PublicationError.
*/
public PublicationError setCause(Throwable cause) { public PublicationError setCause(Throwable cause) {
this.cause = cause; this.cause = cause;
return this; return this;
@ -80,6 +104,9 @@ public class PublicationError {
return this; return this;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public String toString() { public String toString() {
return "PublicationError{" + return "PublicationError{" +

View File

@ -3,14 +3,28 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError; import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.ReflectionUtils; import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.subscription.SubscriptionContext;
import net.engio.mbassy.listener.MessageHandlerMetadata; import net.engio.mbassy.listener.MessageHandlerMetadata;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionContext;
import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/** /**
* The base class for all message bus implementations. * The base class for all message bus implementations.
@ -99,9 +113,13 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
} }
public boolean unsubscribe(Object listener) { public boolean unsubscribe(Object listener) {
if (listener == null) return false; if (listener == null) {
return false;
}
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass()); Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if (subscriptions == null) return false; if (subscriptions == null) {
return false;
}
boolean isRemoved = true; boolean isRemoved = true;
for (Subscription subscription : subscriptions) { for (Subscription subscription : subscriptions) {
isRemoved = isRemoved && subscription.unsubscribe(listener); isRemoved = isRemoved && subscription.unsubscribe(listener);
@ -113,8 +131,9 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
public void subscribe(Object listener) { public void subscribe(Object listener) {
try { try {
Class listeningClass = listener.getClass(); Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass)) if (nonListeners.contains(listeningClass)) {
return; // early reject of known classes that do not participate in eventing return; // early reject of known classes that do not participate in eventing
}
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass); Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially synchronized (this) { // new subscriptions must be processed sequentially
@ -154,7 +173,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
} }
public void addErrorHandler(IPublicationErrorHandler handler) { public final void addErrorHandler(IPublicationErrorHandler handler) {
errorHandlers.add(handler); errorHandlers.add(handler);
} }
@ -192,7 +211,9 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType); Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
if(sub.handlesMessageType(messageType))subscriptions.add(sub); if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
} }
} }
} }
@ -200,7 +221,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
} }
// associate a suscription with a message type // associate a suscription with a message type
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Not thread-safe! must be synchronized in outer scope
private void addMessageTypeSubscription(Class messageType, Subscription subscription) { private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
@ -213,8 +233,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
} }
public void handlePublicationError(PublicationError error) { public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers) { for (IPublicationErrorHandler errorHandler : errorHandlers) {
errorHandler.handleError(error); errorHandler.handleError(error);

View File

@ -3,7 +3,12 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.*; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour. * The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
@ -22,7 +27,7 @@ public class BusConfiguration {
} }
}; };
public static final BusConfiguration Default(){ public static BusConfiguration Default() {
return new BusConfiguration(); return new BusConfiguration();
} }

View File

@ -7,7 +7,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T. * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for per message handler and message publication. * The dispatch mechanism can by controlled for per message handler and message publication.
@ -41,7 +40,7 @@ import java.util.concurrent.TimeUnit;
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications. * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
* Any running message publication that has not yet delivered the message to the removed listener will not see the listener * Any running message publication that has not yet delivered the message to the removed listener will not see the listener
* after the remove operation completed. * after the remove operation completed.
* * <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter * get dispatched to all message handlers that take an instance of List as their parameter
* *
@ -57,14 +56,13 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
* *
* @param listener * @param listener
*/ */
public void subscribe(Object listener); void subscribe(Object listener);
/** /**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled * have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the message listener was still subscribed). * publications that have been published when the message listener was still subscribed).
* * <p/>
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message * A call to this method passing null, an already unsubscribed listener or any object that does not define any message
* handlers will not have any effect and is silently ignored. * handlers will not have any effect and is silently ignored.
* *
@ -72,14 +70,13 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
* @return true, if the listener was found and successfully removed * @return true, if the listener was found and successfully removed
* false otherwise * false otherwise
*/ */
public boolean unsubscribe(Object listener); boolean unsubscribe(Object listener);
/** /**
*
* @param message * @param message
* @return * @return
*/ */
public P post(T message); P post(T message);
/** /**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception, * Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
@ -89,15 +86,14 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
* *
* @param errorHandler * @param errorHandler
*/ */
public void addErrorHandler(IPublicationErrorHandler errorHandler); void addErrorHandler(IPublicationErrorHandler errorHandler);
/** /**
* Returns an immutable collection containing all the registered error handlers * Returns an immutable collection containing all the registered error handlers
* *
* @return * @return
*/ */
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers(); Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
/** /**
* Get the executor service that is used to asynchronous message publication. * Get the executor service that is used to asynchronous message publication.
@ -105,54 +101,49 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
* *
* @return * @return
*/ */
public Executor getExecutor(); Executor getExecutor();
/** /**
* Check whether any asynchronous message publications are pending for being processed * Check whether any asynchronous message publications are pending for being processed
* *
* @return * @return
*/ */
public boolean hasPendingMessages(); boolean hasPendingMessages();
/** /**
* A post command is used as an intermediate object created by a call to the message bus' post method. * A post command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the functionality provided by the message bus that created the command. * It encapsulates the functionality provided by the message bus that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes. * Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*
*/ */
public static interface IPostCommand<T>{ interface IPostCommand<T> {
/** /**
* Execute the message publication immediately. This call blocks until every matching message handler * Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked. * has been invoked.
*/ */
public void now(); void now();
/** /**
* Execute the message publication asynchronously. The behaviour of this method depends on the * Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy: * configured queuing strategy:
* * <p/>
* If an unbound queuing strategy is used the call returns immediately. * If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue. * If a bounded queue is used the call might block until the message can be placed in the queue.
* *
* @return A message publication that can be used to access information about the state of * @return A message publication that can be used to access information about the state of
*/ */
public MessagePublication asynchronously(); MessagePublication asynchronously();
/** /**
* Execute the message publication asynchronously. The behaviour of this method depends on the * Execute the message publication asynchronously. The behaviour of this method depends on the
* configured queuing strategy: * configured queuing strategy:
* * <p/>
* If an unbound queuing strategy is used the call returns immediately. * If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue * If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout is reached. * or the timeout is reached.
* *
* @return A message publication that wraps up the publication request * @return A message publication that wraps up the publication request
*/ */
public MessagePublication asynchronously(long timeout, TimeUnit unit); MessagePublication asynchronously(long timeout, TimeUnit unit);
} }
} }

View File

@ -29,12 +29,12 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
// Dead Event // Dead Event
subscriptions = getSubscriptionsByMessageType(DeadMessage.class); subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message)); return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
} else {
return getPublicationFactory().createPublication(this, subscriptions, message);
} }
else return getPublicationFactory().createPublication(this, subscriptions, message);
} }
/** /**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types) * Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message. * The call blocks until every messageHandler has processed the message.

View File

@ -10,7 +10,7 @@ import java.util.Collection;
* A message publication is created for each asynchronous message dispatch. It reflects the state * A message publication is created for each asynchronous message dispatch. It reflects the state
* of the corresponding message publication process, i.e. provides information whether the * of the corresponding message publication process, i.e. provides information whether the
* publication was successfully scheduled, is currently running etc. * publication was successfully scheduled, is currently running etc.
* * <p/>
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to * A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
* be used in multiple threads simultaneously . * be used in multiple threads simultaneously .
* *
@ -82,8 +82,9 @@ public class MessagePublication {
} }
public MessagePublication markScheduled() { public MessagePublication markScheduled() {
if(!state.equals(State.Initial)) if (!state.equals(State.Initial)) {
return this; return this;
}
state = State.Scheduled; state = State.Scheduled;
return this; return this;
} }

View File

@ -27,7 +27,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
private Entry<T> head; // reference to the first element private Entry<T> head; // reference to the first element
public ConcurrentSet<T> add(T element) { public ConcurrentSet<T> add(T element) {
if (element == null || entries.containsKey(element)) return this; if (element == null || entries.containsKey(element)) {
return this;
}
synchronized (this) { synchronized (this) {
insert(element); insert(element);
} }
@ -40,7 +42,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
} }
private void insert(T element) { private void insert(T element) {
if (entries.containsKey(element)) return; if (entries.containsKey(element)) {
return;
}
if (head == null) { if (head == null) {
head = new Entry<T>(element); head = new Entry<T>(element);
} else { } else {
@ -56,7 +60,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
public ConcurrentSet<T> addAll(Iterable<T> elements) { public ConcurrentSet<T> addAll(Iterable<T> elements) {
synchronized (this) { synchronized (this) {
for (T element : elements) { for (T element : elements) {
if (element == null || entries.containsKey(element)) return this; if (element == null || entries.containsKey(element)) {
return this;
}
insert(element); insert(element);
} }
@ -65,10 +71,14 @@ public class ConcurrentSet<T> implements Iterable<T>{
} }
public boolean remove(T element) { public boolean remove(T element) {
if (!entries.containsKey(element)) return false; if (!entries.containsKey(element)) {
return false;
}
synchronized (this) { synchronized (this) {
Entry<T> listelement = entries.get(element); Entry<T> listelement = entries.get(element);
if(listelement == null)return false; //removed by other thread if (listelement == null) {
return false; //removed by other thread
}
if (listelement != head) { if (listelement != head) {
listelement.remove(); listelement.remove();
} else { } else {
@ -87,7 +97,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
private Entry<T> current = head; private Entry<T> current = head;
public boolean hasNext() { public boolean hasNext() {
if (current == null) return false; if (current == null) {
return false;
}
T value = current.getValue(); T value = current.getValue();
if (value == null) { // auto-removal of orphan references if (value == null) { // auto-removal of orphan references
remove(); remove();
@ -98,7 +110,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
} }
public T next() { public T next() {
if (current == null) return null; if (current == null) {
return null;
}
T value = current.getValue(); T value = current.getValue();
if (value == null) { // auto-removal of orphan references if (value == null) { // auto-removal of orphan references
remove(); remove();
@ -110,7 +124,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
} }
public void remove() { public void remove() {
if (current == null) return; if (current == null) {
return;
}
Entry<T> newCurrent = current.next(); Entry<T> newCurrent = current.next();
ConcurrentSet.this.remove(current.getValue()); ConcurrentSet.this.remove(current.getValue());
current = newCurrent; current = newCurrent;
@ -146,7 +162,9 @@ public class ConcurrentSet<T> implements Iterable<T>{
public void remove() { public void remove() {
if (predecessor != null) { if (predecessor != null) {
predecessor.next = next; predecessor.next = next;
if(next != null)next.predecessor = predecessor; if (next != null) {
next.predecessor = predecessor;
}
} else if (next != null) { } else if (next != null) {
next.predecessor = null; next.predecessor = null;
} }

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.common;
/** /**
* Created with IntelliJ IDEA. * Created with IntelliJ IDEA.
*
* @author bennidi * @author bennidi
* Date: 10/22/12 * Date: 10/22/12
* Time: 9:33 AM * Time: 9:33 AM
@ -9,6 +10,5 @@ package net.engio.mbassy.common;
*/ */
public interface IPredicate<T> { public interface IPredicate<T> {
public boolean apply(T target); boolean apply(T target);
} }

View File

@ -38,12 +38,11 @@ public class ReflectionUtils {
* @param subclass * @param subclass
* @return * @return
*/ */
public static Method getOverridingMethod(Method overridingMethod, Class subclass) { public static Method getOverridingMethod(final Method overridingMethod, final Class subclass) {
Class current = subclass; Class current = subclass;
while (!current.equals(overridingMethod.getDeclaringClass())) { while (!current.equals(overridingMethod.getDeclaringClass())) {
try { try {
Method overridden = current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes()); return current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes());
return overridden;
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
current = current.getSuperclass(); current = current.getSuperclass();
} }
@ -51,10 +50,12 @@ public class ReflectionUtils {
return null; return null;
} }
public static List<Method> withoutOverridenSuperclassMethods(List<Method> allMethods) { public static List<Method> withoutOverridenSuperclassMethods(final List<Method> allMethods) {
List<Method> filtered = new LinkedList<Method>(); List<Method> filtered = new LinkedList<Method>();
for (Method method : allMethods) { for (Method method : allMethods) {
if (!containsOverridingMethod(allMethods, method)) filtered.add(method); if (!containsOverridingMethod(allMethods, method)) {
filtered.add(method);
}
} }
return filtered; return filtered;
} }
@ -68,9 +69,11 @@ public class ReflectionUtils {
return superclasses; return superclasses;
} }
public static boolean containsOverridingMethod(List<Method> allMethods, Method methodToCheck) { public static boolean containsOverridingMethod(final List<Method> allMethods, final Method methodToCheck) {
for (Method method : allMethods) { for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) return true; if (isOverriddenBy(methodToCheck, method)) {
return true;
}
} }
return false; return false;
} }

View File

@ -17,6 +17,9 @@ public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAw
this.delegate = delegate; this.delegate = delegate;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void invoke(final Object listener, final Object message) { public void invoke(final Object listener, final Object message) {
getContext().getOwningBus().getExecutor().execute(new Runnable() { getContext().getOwningBus().getExecutor().execute(new Runnable() {

View File

@ -7,7 +7,7 @@ import net.engio.mbassy.subscription.MessageEnvelope;
/** /**
* The enveloped dispatcher will wrap published messages in an envelope before * The enveloped dispatcher will wrap published messages in an envelope before
* passing them to their configured dispatcher. * passing them to their configured dispatcher.
* * <p/>
* All enveloped message handlers will have this dispatcher in their chain * All enveloped message handlers will have this dispatcher in their chain
* *
* @author bennidi * @author bennidi

View File

@ -25,10 +25,11 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
if (filter == null) { if (filter == null) {
return true; return true;
} } else {
else {
for (int i = 0; i < filter.length; i++) { for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, getContext().getHandlerMetadata())) return false; if (!filter[i].accepts(message, getContext().getHandlerMetadata())) {
return false;
}
} }
return true; return true;
} }

View File

@ -17,7 +17,5 @@ public interface IHandlerInvocation extends ISubscriptionContextAware {
* @param listener The listener that will receive the message * @param listener The listener that will receive the message
* @param message The message to be delivered to the listener * @param message The message to be delivered to the listener
*/ */
public void invoke(final Object listener, final Object message); void invoke(Object listener, Object message);
} }

View File

@ -10,5 +10,5 @@ import net.engio.mbassy.bus.IMessageBus;
*/ */
public interface IMessageBusAware { public interface IMessageBusAware {
public IMessageBus getBus(); IMessageBus getBus();
} }

View File

@ -7,10 +7,10 @@ import net.engio.mbassy.common.ConcurrentSet;
* A message dispatcher provides the functionality to deliver a single message * A message dispatcher provides the functionality to deliver a single message
* to a set of listeners. A message dispatcher uses a message context to access * to a set of listeners. A message dispatcher uses a message context to access
* all information necessary for the message delivery. * all information necessary for the message delivery.
* * <p/>
* The delivery of a single message to a single listener is responsibility of the * The delivery of a single message to a single listener is responsibility of the
* handler invocation object associated with the dispatcher. * handler invocation object associated with the dispatcher.
* * <p/>
* Implementations if IMessageDispatcher are partially designed using decorator pattern * Implementations if IMessageDispatcher are partially designed using decorator pattern
* such that it is possible to compose different message dispatchers into dispatcher chains * such that it is possible to compose different message dispatchers into dispatcher chains
* to achieve more complex dispatch logic. * to achieve more complex dispatch logic.
@ -29,12 +29,14 @@ public interface IMessageDispatcher extends ISubscriptionContextAware {
* @param message The message that should be delivered to the listeners * @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message * @param listeners The listeners that should receive the message
*/ */
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners); void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners);
/** /**
* Get the handler invocation that will be used to deliver the message to each * Get the handler invocation that will be used to deliver the
* listener * message to each listener.
* @return *
* @return the handler invocation that will be used to deliver the
* message to each listener
*/ */
public IHandlerInvocation getInvocation(); IHandlerInvocation getInvocation();
} }

View File

@ -13,7 +13,7 @@ public interface ISubscriptionContextAware extends IMessageBusAware {
/** /**
* Get the subscription context associated with this object * Get the subscription context associated with this object
* *
* @return * @return the subscription context associated with this object
*/ */
public SubscriptionContext getContext(); SubscriptionContext getContext();
} }

View File

@ -7,7 +7,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
/** /**
* Standard implementation for direct, unfiltered message delivery. * Standard implementation for direct, unfiltered message delivery.
* * <p/>
* For each message delivery, this dispatcher iterates over the listeners * For each message delivery, this dispatcher iterates over the listeners
* and uses the previously provided handler invocation to deliver the message * and uses the previously provided handler invocation to deliver the message
* to each listener * to each listener
@ -36,5 +36,4 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
public IHandlerInvocation getInvocation() { public IHandlerInvocation getInvocation() {
return invocation; return invocation;
} }
} }

View File

@ -36,21 +36,18 @@ public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAwar
new PublicationError(e, "Error during messageHandler notification. " + new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible", "The class or method is not accessible",
handler, listener, message)); handler, listener, message));
} } catch (IllegalArgumentException e) {
catch(IllegalArgumentException e){
handlePublicationError( handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " + new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass() "Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0], + "Expected: " + handler.getParameterTypes()[0],
handler, listener, message)); handler, listener, message));
} } catch (InvocationTargetException e) {
catch (InvocationTargetException e) {
handlePublicationError( handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " + new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception", "Message handler threw exception",
handler, listener, message)); handler, listener, message));
} } catch (Throwable e) {
catch (Throwable e) {
handlePublicationError( handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " + new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception", "Unexpected exception",
@ -58,6 +55,9 @@ public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAwar
} }
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void invoke(final Object listener, final Object message) { public void invoke(final Object listener, final Object message) {
invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler()); invokeHandler(message, listener, getContext().getHandlerMetadata().getHandler());

View File

@ -1,6 +1,10 @@
package net.engio.mbassy.listener; package net.engio.mbassy.listener;
import java.lang.annotation.*; import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/** /**
* Configure a handler to receive an enveloped message as a wrapper around the source * Configure a handler to receive an enveloped message as a wrapper around the source

View File

@ -10,8 +10,9 @@ import java.lang.annotation.Target;
* It references a class that implements the IMessageFilter interface. * It references a class that implements the IMessageFilter interface.
* The filter will be used to check whether a message should be delivered * The filter will be used to check whether a message should be delivered
* to the listener or not. * to the listener or not.
*
* <p/> * <p/>
* <p/>
*
* @author bennidi * @author bennidi
* Date: 2/14/12 * Date: 2/14/12
*/ */
@ -22,6 +23,7 @@ public @interface Filter {
/** /**
* The class that implements the filter. * The class that implements the filter.
* Note: A filter always needs to provide a non-arg constructor * Note: A filter always needs to provide a non-arg constructor
*
* @return * @return
*/ */
Class<? extends IMessageFilter> value(); Class<? extends IMessageFilter> value();

View File

@ -31,7 +31,9 @@ public class Filters {
@Override @Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) { public boolean accepts(Object event, MessageHandlerMetadata metadata) {
for (Class handledMessage : metadata.getHandledMessages()) { for (Class handledMessage : metadata.getHandledMessages()) {
if(handledMessage.equals(event.getClass()))return true; if (handledMessage.equals(event.getClass())) {
return true;
}
} }
return false; return false;
} }

View File

@ -1,6 +1,10 @@
package net.engio.mbassy.listener; package net.engio.mbassy.listener;
import java.lang.annotation.*; import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/** /**
* Mark any method of any object(=listener) as a message handler and configure the handler * Mark any method of any object(=listener) as a message handler and configure the handler
@ -23,7 +27,6 @@ public @interface Handler {
/** /**
* Define the mode in which a message is delivered to each listener. Listeners can be notified * Define the mode in which a message is delivered to each listener. Listeners can be notified
* sequentially or concurrently. * sequentially or concurrently.
*
*/ */
Mode delivery() default Mode.Sequential; Mode delivery() default Mode.Sequential;

View File

@ -3,7 +3,7 @@ package net.engio.mbassy.listener;
/** /**
* Message filters can be used to prevent certain messages to be delivered to a specific listener. * Message filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s) * If a filter is used the message will only be delivered if it passes the filter(s)
* * <p/>
* NOTE: A message filter must provide either a no-arg constructor. * NOTE: A message filter must provide either a no-arg constructor.
* *
* @author bennidi * @author bennidi
@ -14,10 +14,8 @@ public interface IMessageFilter {
/** /**
* Evaluate the message to ensure that it matches the handler configuration * Evaluate the message to ensure that it matches the handler configuration
* *
*
* @param message the message to be delivered * @param message the message to be delivered
* @return * @return
*/ */
public boolean accepts(Object message, MessageHandlerMetadata metadata); boolean accepts(Object message, MessageHandlerMetadata metadata);
} }

View File

@ -5,8 +5,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
/** /**
*
*
* @author bennidi * @author bennidi
* Date: 11/14/12 * Date: 11/14/12
*/ */
@ -37,8 +35,7 @@ public class MessageHandlerMetadata {
if (this.envelope != null) { if (this.envelope != null) {
for (Class messageType : envelope.messages()) for (Class messageType : envelope.messages())
handledMessages.add(messageType); handledMessages.add(messageType);
} } else {
else{
handledMessages.add(handler.getParameterTypes()[0]); handledMessages.add(handler.getParameterTypes()[0]);
} }
this.handler.setAccessible(true); this.handler.setAccessible(true);
@ -75,8 +72,12 @@ public class MessageHandlerMetadata {
public boolean handlesMessage(Class<?> messageType) { public boolean handlesMessage(Class<?> messageType) {
for (Class<?> handledMessage : handledMessages) { for (Class<?> handledMessage : handledMessages) {
if(handledMessage.equals(messageType))return true; if (handledMessage.equals(messageType)) {
if(handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) return true; return true;
}
if (handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) {
return true;
}
} }
return false; return false;
} }

View File

@ -9,7 +9,6 @@ import java.util.List;
* Provides information about the message listeners of a specific class. Each message handler * Provides information about the message listeners of a specific class. Each message handler
* defined by the target class is represented as a single entity. * defined by the target class is represented as a single entity.
* *
*
* @author bennidi * @author bennidi
* Date: 12/16/12 * Date: 12/16/12
*/ */
@ -38,7 +37,9 @@ public class MessageListenerMetadata<T> {
public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter) { public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter) {
List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>(); List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>();
for (MessageHandlerMetadata handler : handlers) { for (MessageHandlerMetadata handler : handlers) {
if(filter.apply(handler))matching.add(handler); if (filter.apply(handler)) {
matching.add(handler);
}
} }
return matching; return matching;
} }

View File

@ -5,10 +5,12 @@ import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.subscription.MessageEnvelope; import net.engio.mbassy.subscription.MessageEnvelope;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.*; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/** /**
*
* The meta data reader is responsible for parsing and validating message handler configurations. * The meta data reader is responsible for parsing and validating message handler configurations.
* *
* @author bennidi * @author bennidi
@ -29,7 +31,9 @@ public class MetadataReader {
// retrieve all instances of filters associated with the given subscription // retrieve all instances of filters associated with the given subscription
private IMessageFilter[] getFilter(Handler subscription) { private IMessageFilter[] getFilter(Handler subscription) {
if (subscription.filters().length == 0) return null; if (subscription.filters().length == 0) {
return null;
}
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length]; IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
int i = 0; int i = 0;
for (Filter filterDef : subscription.filters()) { for (Filter filterDef : subscription.filters()) {
@ -38,8 +42,7 @@ public class MetadataReader {
try { try {
filter = filterDef.value().newInstance(); filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter); filterCache.put(filterDef.value(), filter);
} } catch (Exception e) {
catch (Exception e){
throw new RuntimeException(e);// propagate as runtime exception throw new RuntimeException(e);// propagate as runtime exception
} }
@ -75,7 +78,9 @@ public class MetadataReader {
// but an overriding method does inherit the listener configuration of the overwritten method // but an overriding method does inherit the listener configuration of the overwritten method
for (Method handler : bottomMostHandlers) { for (Method handler : bottomMostHandlers) {
Handler handle = handler.getAnnotation(Handler.class); Handler handle = handler.getAnnotation(Handler.class);
if(!handle.enabled() || !isValidMessageHandler(handler)) continue; // disabled or invalid listeners are ignored if (!handle.enabled() || !isValidMessageHandler(handler)) {
continue; // disabled or invalid listeners are ignored
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target); Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
// if a handler is overwritten it inherits the configuration of its parent method // if a handler is overwritten it inherits the configuration of its parent method
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler, MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
@ -92,7 +97,6 @@ public class MetadataReader {
} }
private boolean isValidMessageHandler(Method handler) { private boolean isValidMessageHandler(Method handler) {
if (handler == null || handler.getAnnotation(Handler.class) == null) { if (handler == null || handler.getAnnotation(Handler.class) == null) {
return false; return false;

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.listener;
/** /**
* Created with IntelliJ IDEA. * Created with IntelliJ IDEA.
*
* @author bennidi * @author bennidi
* Date: 11/16/12 * Date: 11/16/12
* Time: 10:01 AM * Time: 10:01 AM

View File

@ -11,16 +11,13 @@ import java.sql.Timestamp;
*/ */
public class MessageEnvelope { public class MessageEnvelope {
private Timestamp tsCreated = new Timestamp(System.currentTimeMillis()); // Internal state
private Object message; private Object message;
public MessageEnvelope(Object message) { public MessageEnvelope(Object message) {
this.message = message; this.message = message;
} }
public <T> T getMessage() { public <T> T getMessage() {
return (T) message; return (T) message;
} }

View File

@ -1,12 +1,12 @@
package net.engio.mbassy.subscription; package net.engio.mbassy.subscription;
import java.util.Comparator;
import java.util.UUID;
import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet; import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher; import net.engio.mbassy.dispatch.IMessageDispatcher;
import java.util.Comparator;
import java.util.UUID;
/** /**
* A subscription is a thread safe container for objects that contain message handlers * A subscription is a thread safe container for objects that contain message handlers
*/ */

View File

@ -25,6 +25,7 @@ public class SubscriptionContext {
/** /**
* Get a reference to the message bus this context belongs to * Get a reference to the message bus this context belongs to
*
* @return * @return
*/ */
public IMessageBus getOwningBus() { public IMessageBus getOwningBus() {
@ -35,6 +36,7 @@ public class SubscriptionContext {
/** /**
* Get the meta data that specifies the characteristics of the message handler * Get the meta data that specifies the characteristics of the message handler
* that is associated with this context * that is associated with this context
*
* @return * @return
*/ */
public MessageHandlerMetadata getHandlerMetadata() { public MessageHandlerMetadata getHandlerMetadata() {

View File

@ -1,9 +1,16 @@
package net.engio.mbassy.subscription; package net.engio.mbassy.subscription;
import net.engio.mbassy.dispatch.*; import net.engio.mbassy.dispatch.AsynchronousHandlerInvocation;
import net.engio.mbassy.dispatch.EnvelopedMessageDispatcher;
import net.engio.mbassy.dispatch.FilteredMessageDispatcher;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.MessageDispatcher;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
/** /**
* Created with IntelliJ IDEA. * Created with IntelliJ IDEA.
*
* @author bennidi * @author bennidi
* Date: 11/16/12 * Date: 11/16/12
* Time: 10:39 AM * Time: 10:39 AM

View File

@ -1,6 +1,7 @@
package net.engio.mbassy; package net.engio.mbassy;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ConcurrentSet; import net.engio.mbassy.common.ConcurrentSet;
@ -24,11 +25,11 @@ import java.util.Random;
*/ */
public class ConcurrentSetTest extends UnitTest { public class ConcurrentSetTest extends UnitTest {
// Shared state
private int numberOfElements = 100000; private int numberOfElements = 100000;
private int numberOfThreads = 50; private int numberOfThreads = 50;
@Ignore("Currently fails when building as a suite with JDK 1.7.0_15 and Maven 3.0.5 on a Mac")
@Test @Test
public void testIteratorCleanup() { public void testIteratorCleanup() {
final HashSet<Object> persistingCandidates = new HashSet<Object>(); final HashSet<Object> persistingCandidates = new HashSet<Object>();
@ -44,7 +45,8 @@ public class ConcurrentSetTest extends UnitTest {
testSet.add(candidate); testSet.add(candidate);
} }
// this will remove all objects that have not been inserted into the set of persisting candidates // Remove/Garbage collect all objects that have not
// been inserted into the set of persisting candidates.
runGC(); runGC();
ConcurrentExecutor.runConcurrent(new Runnable() { ConcurrentExecutor.runConcurrent(new Runnable() {
@ -62,8 +64,6 @@ public class ConcurrentSetTest extends UnitTest {
for (Object test : testSet) { for (Object test : testSet) {
assertTrue(persistingCandidates.contains(test)); assertTrue(persistingCandidates.contains(test));
} }
} }

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.common; package net.engio.mbassy.common;
import junit.framework.Assert; import junit.framework.Assert;
import net.engio.mbassy.*; import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.BusConfiguration; import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.MBassador;
@ -26,6 +27,4 @@ public class MessageBusTest extends UnitTest{
bus.addErrorHandler(TestFailingHandler); bus.addErrorHandler(TestFailingHandler);
return bus; return bus;
} }
} }

View File

@ -13,12 +13,14 @@ import java.lang.ref.WeakReference;
*/ */
public class UnitTest { public class UnitTest {
// Internal state
private Runtime runtime = Runtime.getRuntime();
public void pause(long ms) { public void pause(long ms) {
try { try {
Thread.sleep(ms); Thread.sleep(ms);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. e.printStackTrace();
} }
} }
@ -30,7 +32,7 @@ public class UnitTest {
public void runGC() { public void runGC() {
WeakReference ref = new WeakReference<Object>(new Object()); WeakReference ref = new WeakReference<Object>(new Object());
while(ref.get() != null) { while(ref.get() != null) {
System.gc(); runtime.gc();
} }
} }