initial project setup

This commit is contained in:
benni 2012-10-23 09:32:55 +02:00
parent 88ae82e3ca
commit 8c4d515c21
39 changed files with 1163 additions and 0 deletions

17
mbassador.iml Normal file
View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: junit:junit:4.10" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
</component>
</module>

49
pom.xml Normal file
View File

@ -0,0 +1,49 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.0.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Library for simple implementation of bidirectional conversions</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,27 @@
package org.mbassy;
import org.mbassy.filter.Filter;
import java.lang.annotation.*;
/**
* TODO. Insert class description here
* <p/>
* User: benni
* Date: 2/8/12
* Time: 3:35 PM
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Listener {
Filter[] value() default {}; // no filters by default
Dispatch mode() default Dispatch.Synchronous;
public static enum Dispatch{
Synchronous,Asynchronous
}
}

View File

@ -0,0 +1,427 @@
package org.mbassy;
import org.mbassy.filter.Filter;
import org.mbassy.filter.MessageFilter;
import org.mbassy.common.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
/**
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for each concrete message publication.
* A message publication is the publication of any message using one of the bus' publish(..) methods.
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously.
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Listener annotation.
* <p/>
* The bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched.
* <p/>
* Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
* class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the subscribed message handlers.
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
* a message handler may define filters to narrow the set of messages that it accepts.
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
* <p/>
* Removing a listener means removing all subscribed message handlers of that object. This remove operation
* immediately takes effect and on all running dispatch processes. A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
*
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
*
* @Author bennidi
* Date: 2/8/12
*/
public class MBassador<T>{
// This predicate is used to find all message listeners (methods annotated with @Listener)
private static final IPredicate<Method> AllMessageListeners = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
}
};
// This is the default error handler it will simply log to standard out and
// print stack trace if available
protected static final class ConsoleLogger implements IPublicationErrorHandler {
@Override
public void handleError(PublicationError error) {
System.out.println(error);
if (error.getCause() != null) error.getCause().printStackTrace();
}
};
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor = new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
// all subscriptions per message type
// this is the primary list for dispatching a specific message
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage = new HashMap(50);
// all subscriptions per messageHandler type
// this list provides access for subscribing and unsubsribing
private final Map<Class, Collection<Subscription>> subscriptionsPerListener = new HashMap(50);
// remember already processed classes that do not contain any listeners
private final Collection<Class> nonListeners = new HashSet();
// this handler will receive all errors that occur during message dispatch or message handling
private IPublicationErrorHandler errorHandler = new ConsoleLogger();
// all threads that are available for asynchronous message dispatching
private final CopyOnWriteArrayList<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final LinkedBlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>();
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
public void run() {
while (true) {
try {
publish(pendingMessages.take());
} catch (InterruptedException e) {
errorHandler.handleError(new PublicationError(e, "Asnchronous publication interupted", null, null, null));
return;
}
}
}
});
dispatchers.add(dispatcher);
dispatcher.start();
}
}
public MBassador(){
initDispatcherThreads(2);
}
public MBassador(int dispatcherThreadCount){
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
}
public void publishAsync(T message){
pendingMessages.offer(message);
}
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message.
*
* @param message
*/
public void publish(T message){
try {
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
for (Subscription subscription : subscriptions) subscription.publish(message);
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(message));
}
}
/**
* Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the messageHandler was still subscribed).
* A call to this method passing null, an already subscribed message or any message that does not define any listeners
* will not have any effect.
*
* @param listener
*/
public void unsubscribe(Object listener){
if (listener == null) return;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
public void subscribe(Object listener){
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass))
return; // early reject of known classes that do not participate in eventing
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially for each class
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = getListeners(listeningClass); // get all methods with subscriptions
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
if (messageHandlers.isEmpty()) { // remember the class as non listening class
nonListeners.add(listeningClass);
return;
}
// create subscriptions for all detected listeners
for (Method messageHandler : messageHandlers) {
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class));
Class eventType = getMessageType(messageHandler);
Subscription subscription = new Subscription(messageHandler, filter);
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
//updateMessageTypeHierarchy(eventType);
}
subscriptionsPerListener.put(listeningClass, subscriptionsByListener);
}
}
}
// register the message to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
}
public void setErrorHandler(IPublicationErrorHandler handler){
this.errorHandler = handler;
}
// obtain the set of subscriptions for the given message type
private Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
// TODO improve with cache
Collection<Subscription> subscriptions = new LinkedList<Subscription>();
if(subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : getSuperclasses(messageType)){
if(subscriptionsPerMessage.get(eventSuperType) != null){
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
return subscriptions;
}
private Collection<Class> getSuperclasses(Class from){
Collection<Class> superclasses = new LinkedList<Class>();
while(!from.equals(Object.class)){
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
return superclasses;
}
// associate a suscription with a message type
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArraySet<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
}
/*
private void updateMessageTypeHierarchy(Class messageType) {
for (Class existingEventType : subscriptionsPerMessage.keySet()) {
if (existingEventType.equals(messageType)) continue;
if (messageType.isAssignableFrom(existingEventType)) //message is super type of existing
messageTypeHierarchy.put(existingEventType, messageType);
else if (existingEventType.isAssignableFrom(messageType)) { // message is sub type of existing
messageTypeHierarchy.put(messageType, existingEventType); // add direct super type
messageTypeHierarchy.putAll(messageType, messageTypeHierarchy.get(existingEventType)); // add all super types of super type
}
}
}*/
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found nono or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
return true;
}
private static Class getMessageType(Method listener) {
return listener.getParameterTypes()[0];
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
private static List<Method> getListeners(Class<?> target) {
return ReflectionUtils.getMethods(AllMessageListeners, target);
}
// retrieve all instances of filters associated with the given subscription
private MessageFilter[] getFilter(Listener subscription) {
if (subscription.value().length == 0) return null;
MessageFilter[] filters = new MessageFilter[subscription.value().length];
int i = 0;
for (Filter filterDef : subscription.value()) {
MessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
try {
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error retrieving filter"));
}
}
filters[i] = filter;
i++;
}
return filters;
}
private void handlePublicationError(PublicationError error) {
errorHandler.handleError(error);
}
@Override
protected void finalize() throws Throwable {
super.finalize();
for(Thread dispatcher : dispatchers){
dispatcher.interrupt();
}
}
/**
* Subscription is a thread safe container for objects that contain message handlers
*
*/
private class Subscription {
private final MessageFilter[] filter;
private final Method messageHandler;
private ConcurrentLinkedBag<Object> listeners = new ConcurrentLinkedBag<Object>();
private boolean isAynchronous;
private Subscription(Method messageHandler, MessageFilter[] filter) {
this.messageHandler = messageHandler;
this.filter = filter;
this.messageHandler.setAccessible(true);
this.isAynchronous = messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous);
}
public void subscribe(Object o) {
listeners.add(o);
}
private void dispatch(final Object message, final Object listener){
if(isAynchronous){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
else{
invokeHandler(message, listener);
}
}
private void invokeHandler(final Object message, final Object listener){
try {
messageHandler.invoke(listener, message);
}catch(IllegalAccessException e){
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
messageHandler, listener, message));
}
catch(IllegalArgumentException e){
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + messageHandler.getParameterTypes()[0],
messageHandler, listener, message));
}
catch (InvocationTargetException e) {
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
messageHandler, listener, message));
}
catch (Throwable e) {
MBassador.this.handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
messageHandler, listener, message));
}
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
if(passesFilter(message, listener)) {
dispatch(message, listener);
}
}
}
private boolean passesFilter(Object message, Object listener) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, listener)) return false;
}
return true;
}
}
public void unsubscribe(Object existingListener) {
listeners.remove(existingListener);
}
}
}

View File

@ -0,0 +1,159 @@
package org.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
* structure. Remove operations can affect any running iterator such that a removed element that has not yet
* been reached by the iterator will not appear in that iterator anymore.
*
* The structure uses weak references to the elements. Iterators automatically perform cleanups of
* garbace collect objects during iteration.
* No dedicated maintenance operations need to be called or run in background.
*
*
* <p/>
* @author bennidi
* Date: 2/12/12
*/
public class ConcurrentLinkedBag<T> implements Iterable<T> {
private WeakHashMap<T, ListEntry<T>> entries = new WeakHashMap<T, ListEntry<T>>(); // maintain a map of entries for O(log n) lookup
private ListEntry<T> head; // reference to the first element
public ConcurrentLinkedBag<T> add(T element) {
if (element == null || entries.containsKey(element)) return this;
synchronized (this) {
insert(element);
}
return this;
}
private void insert(T element) {
if(head == null){
head = new ListEntry<T>(element);
}
else{
head = new ListEntry<T>(element, head);
}
entries.put(element, head);
}
public ConcurrentLinkedBag<T> addAll(Iterable<T> elements) {
for (T element : elements) {
if (element == null || entries.containsKey(element)) return this;
synchronized (this) {
insert(element);
}
}
return this;
}
public ConcurrentLinkedBag<T> remove(T element) {
if (!entries.containsKey(element)) return this;
synchronized (this) {
ListEntry<T> listelement = entries.get(element);
if(listelement != head){
listelement.remove();
}
else{
head = head.next();
}
entries.remove(element);
}
return this;
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private ListEntry<T> current = head;
public boolean hasNext() {
if(current == null) return false;
T value = current.getValue();
if(value == null){ // auto-removal of orphan references
remove();
return hasNext();
}
else{
return true;
}
}
public T next() {
if(current == null) return null;
T value = current.getValue();
if(value == null){ // auto-removal of orphan references
remove();
return next();
}
else{
current = current.next();
return value;
}
}
public void remove() {
if(current == null)return;
synchronized (ConcurrentLinkedBag.this){
current.remove();
current = current.next();}
}
};
}
public class ListEntry<T> {
private WeakReference<T> value;
private ListEntry<T> next;
private ListEntry<T> predecessor;
private ListEntry(T value) {
this.value = new WeakReference<T>(value);
}
private ListEntry(T value, ListEntry<T> next) {
this(value);
this.next = next;
next.predecessor = this;
}
public T getValue() {
return value.get();
}
public void remove(){
if(predecessor != null){
predecessor.setNext(next());
}
else if(next() != null){
next.predecessor = null;
}
}
public void setNext(ListEntry<T> element) {
this.next = element;
if(element != null)element.predecessor = this;
}
public ListEntry<T> next() {
return next;
}
public boolean hasNext() {
return next() != null;
}
}
}

View File

@ -0,0 +1,14 @@
package org.mbassy.common;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 10/22/12
* Time: 9:33 AM
* To change this template use File | Settings | File Templates.
*/
public interface IPredicate<T> {
public boolean apply(T target);
}

View File

@ -0,0 +1,13 @@
package org.mbassy.common;
/**
* TODO. Insert class description here
* <p/>
* User: benni
* Date: 2/22/12
* Time: 5:03 PM
*/
public interface IPublicationErrorHandler {
public void handleError(PublicationError error);
}

View File

@ -0,0 +1,98 @@
package org.mbassy.common;
import java.lang.reflect.Method;
/**
* Publication errors are created when object publication fails for some reason and contain details
* as to the cause and location where they occured.
* <p/>
* User: benni
* Date: 2/22/12
* Time: 4:59 PM
*/
public class PublicationError {
private Throwable cause;
private String message;
private Method listener;
private Object listeningObject;
private Object publishedObject;
public PublicationError(Throwable cause, String message, Method listener, Object listeningObject, Object publishedObject) {
this.cause = cause;
this.message = message;
this.listener = listener;
this.listeningObject = listeningObject;
this.publishedObject = publishedObject;
}
public PublicationError(){
super();
}
public Throwable getCause() {
return cause;
}
public PublicationError setCause(Throwable cause) {
this.cause = cause;
return this;
}
public String getMessage() {
return message;
}
public PublicationError setMessage(String message) {
this.message = message;
return this;
}
public Method getListener() {
return listener;
}
public PublicationError setListener(Method listener) {
this.listener = listener;
return this;
}
public Object getListeningObject() {
return listeningObject;
}
public PublicationError setListeningObject(Object listeningObject) {
this.listeningObject = listeningObject;
return this;
}
public Object getPublishedObject() {
return publishedObject;
}
public PublicationError setPublishedObject(Object publishedObject) {
this.publishedObject = publishedObject;
return this;
}
@Override
public String toString() {
return "PublicationError{" +
"\n" +
"\tcause=" + cause +
"\n" +
"\tmessage='" + message + '\'' +
"\n" +
"\tlistener=" + listener +
"\n" +
"\tlisteningObject=" + listeningObject +
"\n" +
"\tpublishedObject=" + publishedObject +
'}';
}
}

View File

@ -0,0 +1,70 @@
package org.mbassy.common;
import com.google.common.base.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
* User: benni
* Date: 2/16/12
* Time: 12:14 PM
*/
public class ReflectionUtils {
private static final Logger logger = LoggerFactory.getLogger(ReflectionUtils.class);
public static List<Method> getMethods(IPredicate<Method> condition, Class<?> target) {
List<Method> methods = new LinkedList<Method>();
try {
for (Method method : target.getDeclaredMethods()) {
if (condition.apply(method)) {
methods.add(method);
}
}
} catch (Exception e) {
//nop
}
if (!target.equals(Object.class)) {
methods.addAll(getMethods(condition, target.getSuperclass()));
}
return methods;
}
public static List<Field> getFields(Predicate<Field> condition, Class<?> target) {
List<Field> methods = new LinkedList<Field>();
try {
for (Field method : target.getDeclaredFields()) {
if (condition.apply(method)) {
methods.add(method);
}
}
} catch (Exception e) {
//nop
}
if (!target.equals(Object.class)) {
methods.addAll(getFields(condition, target.getSuperclass()));
}
return methods;
}
public static Object callMethod(Object o, final String methodName, Object... args) {
if(o == null || methodName == null) {
return null;
}
Object res = null;
try {
Method m = o.getClass().getMethod(methodName);
res = m.invoke(o, args);
} catch (Exception e) {
//logger.warn("Not possible to get value", e);
}
return res;
}
}

View File

@ -0,0 +1,23 @@
package org.mbassy.filter;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* THe filter annotation is used to add filters to message listeners.
* It references a class that implements the MessageFilter interface.
* The object filter will be used to check whether a message should be delivered
* to the message listener or not.
*
* <p/>
* @author benni
* Date: 2/14/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.ANNOTATION_TYPE})
public @interface Filter {
Class<? extends MessageFilter> value();
}

View File

@ -0,0 +1,31 @@
package org.mbassy.filter;
/**
* Object filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s)
*
* User: benni
* Date: 2/8/12
*/
public interface MessageFilter {
/**
* Evaluate the message and listener to ensure that the message should be handled by the listener
*
*
* @param event the event to be delivered
* @param listener the listener instance that would receive the event if it passes the filter
* @return
*/
public boolean accepts(Object event, Object listener);
public static final class All implements MessageFilter {
@Override
public boolean accepts(Object event, Object listener) {
return true;
}
}
}

View File

@ -0,0 +1,68 @@
package org.mbassy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* Run various tests concurrently. A given instance of runnable will be used to spawn and start
* as many threads as specified by an additional parameter or (if multiple runnables have been
* passed to the method) one thread for each runnable.
* <p/>
* Date: 2/14/12
*
* @Author bennidi
*/
public class ConcurrentExecutor {
public static void runConcurrent(final Runnable unit, int numberOfConcurrentExecutions) {
Runnable[] units = new Runnable[numberOfConcurrentExecutions];
// create the tasks and schedule for execution
for (int i = 0; i < numberOfConcurrentExecutions; i++) {
units[i] = unit;
}
runConcurrent(units);
}
public static void runConcurrent(final Runnable... units) {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Long>> returnValues = new ArrayList<Future<Long>>();
// create the tasks and schedule for execution
for (final Runnable unit : units) {
Callable<Long> wrapper = new Callable<Long>() {
@Override
public Long call() throws Exception {
long start = System.currentTimeMillis();
unit.run();
return System.currentTimeMillis() - start;
}
};
returnValues.add(executor.submit(wrapper));
}
// wait until all tasks have been executed
try {
executor.shutdown();// tells the thread pool to execute all waiting tasks
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// unlikely that this will happen
e.printStackTrace();
}
// print results
for (Future<Long> result : returnValues)
try {
System.out.println("Execution of unit of work to " + result.get() + "ms.");
} catch (Exception e) {
//should never happen
// since the code waits until all tasks are processed
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,167 @@
package org.mbassy;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
* @author bennidi
* Date: 2/8/12
*/
public class MBassadorTest {
@Test
public void testAsynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publishAsync(event);
bus.publishAsync(subEvent);
Thread.sleep(2000);
Assert.assertTrue(event.counter.get() == 1000);
Assert.assertTrue(subEvent.counter.get() == 1000 * 2);
}
@Test
public void testSynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 100;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publish(event);
bus.publish(subEvent);
Assert.assertEquals(i, event.counter.get());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
Assert.assertEquals(i * 2, subEvent.counter.get());
}
}
@Test
public void testConcurrentPublication() {
final MBassador bus = new MBassador();
final int listenerCount = 100;
final int concurenny = 20;
final CopyOnWriteArrayList<TestEvent> testEvents = new CopyOnWriteArrayList<TestEvent>();
final CopyOnWriteArrayList<SubTestEvent> subtestEvents = new CopyOnWriteArrayList<SubTestEvent>();
final CopyOnWriteArrayList<EventingTestBean> persistentReferences = new CopyOnWriteArrayList<EventingTestBean>();
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
long end = System.currentTimeMillis();
System.out.println("MBassador: Creating " + listenerCount + " listeners took " + (end - start) + " ms");
}
}, concurenny);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < listenerCount; i++) {
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
testEvents.add(event);
subtestEvents.add(subEvent);
bus.publishAsync(event);
bus.publish(subEvent);
}
long end = System.currentTimeMillis();
System.out.println("MBassador: Publishing " + 2 * listenerCount + " events took " + (end - start) + " ms");
}
}, concurenny);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
for(TestEvent event : testEvents){
Assert.assertEquals(listenerCount * concurenny, event.counter.get());
}
for(SubTestEvent event : subtestEvents){
Assert.assertEquals(listenerCount * concurenny * 2, event.counter.get());
}
}
public static class TestEvent {
public AtomicInteger counter = new AtomicInteger();
}
public static class SubTestEvent extends TestEvent {
}
public class EventingTestBean {
@Listener
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet();
}
@Listener(mode = Listener.Dispatch.Asynchronous)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet();
}
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.