bugfix issue #3, added more test cases

This commit is contained in:
benni 2012-11-22 14:41:51 +01:00
parent 271a445a97
commit 989ed67285
35 changed files with 537 additions and 251 deletions

View File

@ -4,10 +4,14 @@ Mbassador
Mbassador is a very light-weight message (event) bus implementation following the publish subscribe pattern. It is designed
for ease of use and aims to be feature rich, extensible while preserving resource efficiency and performance.
It uses a specialized data structure to allow high throughput for concurrent access.
Read this documentation to get an overview of its features and how cool this message (event) bus actually is.
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
which also contains a partial list of the features of the compared implementations.
The current version is 1.0.1.RC
Table of contents:
+ [Features](#features)
+ [Usage](#usage)

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.0.RC</version>
<version>1.0.1.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern

View File

@ -12,23 +12,18 @@ import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
/**
* The base class for all message bus implementations.
*
* @param <T>
* @param <P>
*/
public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand> implements IMessageBus<T, P> {
// This predicate is used to find all message listeners (methods annotated with @Listener)
private static final IPredicate<Method> AllMessageListeners = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
}
};
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor;
// the metadata reader that is used to parse objects passed to the subscribe method
private MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type
@ -64,7 +59,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
public void run() {
while (true) {
try {
pendingMessages.take().execute();
pendingMessages.take().execute();
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
return;
@ -95,20 +90,23 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
protected abstract SubscriptionFactory getSubscriptionFactory();
protected void initialize(){}
protected void initialize() {
}
@Override
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers);
}
public void unsubscribe(Object listener) {
if (listener == null) return;
public boolean unsubscribe(Object listener) {
if (listener == null) return false;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if (subscriptions == null) return;
if (subscriptions == null) return false;
boolean isRemoved = false;
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
isRemoved = isRemoved || subscription.unsubscribe(listener);
}
return isRemoved;
}
@ -122,7 +120,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
synchronized (this) { // new subscriptions must be processed sequentially for each class
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = getListeners(listeningClass); // get all methods with subscriptions
List<Method> messageHandlers = metadataReader.getListeners(listeningClass); // get all methods with subscriptions
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
if (messageHandlers.isEmpty()) { // remember the class as non listening class
nonListeners.add(listeningClass);
@ -143,7 +141,9 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
}
// register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
for (Subscription sub : subscriptionsByListener){
sub.subscribe(listener);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -209,16 +209,12 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
return listener.getParameterTypes()[0];
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
private static List<Method> getListeners(Class<?> target) {
return ReflectionUtils.getMethods(AllMessageListeners, target);
}
public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers)
for (IPublicationErrorHandler errorHandler : errorHandlers){
errorHandler.handleError(error);
}
}
@Override

View File

@ -62,8 +62,10 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
* will not have any effect.
*
* @param listener
* @return true, if the listener was found and successfully removed
* false otherwise
*/
public void unsubscribe(Object listener);
public boolean unsubscribe(Object listener);
/**
*

View File

@ -3,9 +3,8 @@ package org.mbassy;
/**
* TODO. Insert class description here
* <p/>
* User: benni
* @author bennidi
* Date: 2/22/12
* Time: 5:03 PM
*/
public interface IPublicationErrorHandler {

View File

@ -6,7 +6,7 @@ import java.lang.reflect.Method;
* Publication errors are created when object publication fails for some reason and contain details
* as to the cause and location where they occured.
* <p/>
* User: benni
* @author bennidi
* Date: 2/22/12
* Time: 4:59 PM
*/

View File

@ -2,7 +2,7 @@ package org.mbassy;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/12/12
* Time: 8:44 PM
* To change this template use File | Settings | File Templates.

View File

@ -66,11 +66,11 @@ public class ConcurrentSet<T> implements Iterable<T> {
return this;
}
public ConcurrentSet<T> remove(T element) {
if (!entries.containsKey(element)) return this;
public boolean remove(T element) {
if (!entries.containsKey(element)) return false;
synchronized (this) {
Entry<T> listelement = entries.get(element);
if(listelement == null)return this;
if(listelement == null)return false;
if (listelement != head) {
listelement.remove();
} else {
@ -78,7 +78,7 @@ public class ConcurrentSet<T> implements Iterable<T> {
}
entries.remove(element);
}
return this;
return true;
}
public Iterator<T> iterator() {

View File

@ -2,7 +2,7 @@ package org.mbassy.common;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 10/22/12
* Time: 9:33 AM
* To change this template use File | Settings | File Templates.

View File

@ -1,64 +1,85 @@
package org.mbassy.common;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* User: benni
* Date: 2/16/12
* Time: 12:14 PM
* @author bennidi
* Date: 2/16/12
* Time: 12:14 PM
*/
public class ReflectionUtils {
public static List<Method> getMethods(IPredicate<Method> condition, Class<?> target) {
List<Method> methods = new LinkedList<Method>();
try {
for (Method method : target.getDeclaredMethods()) {
if (condition.apply(method)) {
methods.add(method);
}
}
} catch (Exception e) {
//nop
}
if (!target.equals(Object.class)) {
methods.addAll(getMethods(condition, target.getSuperclass()));
}
return methods;
}
public static List<Method> getMethods(IPredicate<Method> condition, Class<?> target) {
List<Method> methods = new LinkedList<Method>();
try {
for (Method method : target.getDeclaredMethods()) {
if (condition.apply(method)) {
methods.add(method);
}
}
} catch (Exception e) {
//nop
}
if (!target.equals(Object.class)) {
methods.addAll(getMethods(condition, target.getSuperclass()));
}
return methods;
}
public static List<Field> getFields(IPredicate<Field> condition, Class<?> target) {
List<Field> methods = new LinkedList<Field>();
try {
for (Field method : target.getDeclaredFields()) {
if (condition.apply(method)) {
methods.add(method);
}
}
} catch (Exception e) {
//nop
}
if (!target.equals(Object.class)) {
methods.addAll(getFields(condition, target.getSuperclass()));
}
return methods;
}
public static Method getOverridingMethod(Method overridingMethod, Class subclass) {
Class current = subclass;
while(!current.equals(overridingMethod.getDeclaringClass())){
try {
Method overridden = current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes());
return overridden;
} catch (NoSuchMethodException e) {
current = current.getSuperclass();
}
}
return null;
}
public static Object callMethod(Object o, final String methodName, Object... args) {
public static List<Method> withoutOverridenSuperclassMethods(List<Method> allMethods) {
List<Method> filtered = new LinkedList<Method>();
for (Method method : allMethods) {
if (!containsOverridingMethod(allMethods, method)) filtered.add(method);
}
return filtered;
}
if(o == null || methodName == null) {
return null;
}
public static boolean containsOverridingMethod(List<Method> allMethods, Method methodToCheck) {
for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) return true;
}
return false;
}
private static boolean isOverriddenBy(Method superclassMethod, Method subclassMethod) {
// if the declaring classes are the same or the subclass method is not defined in the subbclass
// hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod
if (superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass())
|| !superclassMethod.getDeclaringClass().isAssignableFrom(subclassMethod.getDeclaringClass())
|| !superclassMethod.getName().equals(subclassMethod.getName())) {
return false;
}
Class[] superClassMethodParameters = superclassMethod.getParameterTypes();
Class[] subClassMethodParameters = superclassMethod.getParameterTypes();
// method must specify the same number of parameters
if(subClassMethodParameters.length != subClassMethodParameters.length){
return false;
}
//the parameters must occur in the exact same order
for(int i = 0 ; i< subClassMethodParameters.length; i++){
if(!superClassMethodParameters[i].equals(subClassMethodParameters[i])){
return false;
}
}
return true;
}
Object res = null;
try {
Method m = o.getClass().getMethod(methodName);
res = m.invoke(o, args);
} catch (Exception e) {
//logger.warn("Not possible to get value", e);
}
return res;
}
}

View File

@ -5,7 +5,7 @@ import java.lang.annotation.*;
/**
* TODO. Insert class description here
* <p/>
* User: benni
* @author bennidi
* Date: 2/8/12
* Time: 3:35 PM
*/

View File

@ -4,7 +4,7 @@ package org.mbassy.listener;
* Object filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s)
*
* User: benni
* @author bennidi
* Date: 2/8/12
*/
public interface MessageFilter {

View File

@ -7,7 +7,7 @@ import org.mbassy.listener.MessageFilter;
import java.lang.reflect.Method;
/**
* User: benni
* @author bennidi
* Date: 11/14/12
*/
public class MessageHandlerMetadata {

View File

@ -1,18 +1,32 @@
package org.mbassy.listener;
import org.mbassy.common.IPredicate;
import org.mbassy.common.ReflectionUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/16/12
* Time: 10:22 AM
* To change this template use File | Settings | File Templates.
*/
public class MetadataReader {
// This predicate is used to find all message listeners (methods annotated with @Listener)
private static final IPredicate<Method> AllMessageHandlers = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
}
};
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
@ -40,4 +54,32 @@ public class MetadataReader {
MessageFilter[] filter = getFilter(config);
return new MessageHandlerMetadata(messageHandler, filter, config);
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<Method> getListeners(Class<?> target) {
List<Method> allMethods = ReflectionUtils.getMethods(AllMessageHandlers, target);
List<Method> handlers = new LinkedList<Method>();
for(Method listener : allMethods){
Method overriddenHandler = ReflectionUtils.getOverridingMethod(listener, target);
if(overriddenHandler != null && isHandler(overriddenHandler)){
handlers.add(overriddenHandler);
}
if(overriddenHandler == null){
handlers.add(listener);
}
}
return ReflectionUtils.withoutOverridenSuperclassMethods(handlers);
}
private static boolean isHandler(Method m){
Annotation[] annotations = m.getDeclaredAnnotations();
for(Annotation annotation : annotations){
if(annotation.equals(Listener.class))return true;
}
return false;
}
}

View File

@ -2,7 +2,7 @@ package org.mbassy.listener;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/16/12
* Time: 10:01 AM
* To change this template use File | Settings | File Templates.

View File

@ -10,7 +10,7 @@ import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:50 PM
* To change this template use File | Settings | File Templates.

View File

@ -12,7 +12,7 @@ import java.util.Iterator;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:48 PM
* To change this template use File | Settings | File Templates.

View File

@ -10,7 +10,7 @@ import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:49 PM
* To change this template use File | Settings | File Templates.

View File

@ -10,12 +10,15 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Comparator;
import java.util.UUID;
/**
* Subscription is a thread safe container for objects that contain message handlers
*/
public abstract class Subscription {
private UUID id = UUID.randomUUID();
private final Method handler;
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
@ -88,8 +91,8 @@ public abstract class Subscription {
}
public void unsubscribe(Object existingListener) {
listeners.remove(existingListener);
public boolean unsubscribe(Object existingListener) {
return listeners.remove(existingListener);
}
@ -97,7 +100,7 @@ public abstract class Subscription {
@Override
public int compare(Subscription o1, Subscription o2) {
int result = o1.getPriority() - o2.getPriority();
return result == 0 ? o1.handler.hashCode() - o2.handler.hashCode() : result;
return result == 0 ? o1.id.compareTo(o2.id): result;
}
};

View File

@ -4,7 +4,7 @@ import java.util.Collection;
import java.util.LinkedList;
/**
* User: benni
* @author bennidi
* Date: 11/16/12
*/
public class SubscriptionDeliveryRequest<T> {

View File

@ -8,7 +8,7 @@ import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/16/12
* Time: 10:39 AM
* To change this template use File | Settings | File Templates.

View File

@ -10,7 +10,7 @@ import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:48 PM
* To change this template use File | Settings | File Templates.

View File

@ -11,7 +11,7 @@ import java.util.Iterator;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:45 PM
* To change this template use File | Settings | File Templates.

View File

@ -10,7 +10,7 @@ import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/14/12
* Time: 3:49 PM
* To change this template use File | Settings | File Templates.

View File

@ -10,7 +10,7 @@ import java.util.Random;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/12/12
* Time: 3:02 PM
* To change this template use File | Settings | File Templates.

View File

@ -1,141 +1,147 @@
package org.mbassy;
import org.junit.Assert;
import org.junit.Test;
import org.mbassy.listener.Filter;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.Mode;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listeners.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
* @author bennidi
* Date: 2/8/12
* Date: 2/8/12
*/
public class MBassadorTest {
public class MBassadorTest extends UnitTest {
@Test
public void testSubscribe() throws InterruptedException {
public void testSubscribeSimple() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean listener = new EventingTestBean();
NonListeningBean nonListener = new NonListeningBean();
bus.subscribe(listener);
bus.subscribe(nonListener);
assertTrue(bus.unsubscribe(listener));
assertFalse(bus.unsubscribe(nonListener));
assertFalse(bus.unsubscribe(new EventingTestBean()));
}
}
@Test
public void testSubscribeConcurrent() throws Exception {
MBassador bus = new MBassador();
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
.create(100, EventingTestBean3.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
bus.publish(event);
bus.publish(subEvent);
pause(4000);
assertEquals(300, event.counter.get());
assertEquals(700, subEvent.counter.get());
}
@Test
public void testAsynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
bus.unsubscribe(new EventingTestBean());
}
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publishAsync(event);
bus.publishAsync(subEvent);
pause(2000);
assertTrue(event.counter.get() == 1000);
assertTrue(subEvent.counter.get() == 1000 * 2);
}
@Test
public void testUnSubscribe() throws InterruptedException {
public void testSynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
int listenerCount = 10;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
bus.unsubscribe(new EventingTestBean());
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publish(event);
bus.publish(subEvent);
assertEquals(i, event.counter.get());
pause(50);
assertEquals(i * 2, subEvent.counter.get());
}
}
@Test
public void testAsynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publishAsync(event);
bus.publishAsync(subEvent);
Thread.sleep(2000);
Assert.assertTrue(event.counter.get() == 1000);
Assert.assertTrue(subEvent.counter.get() == 1000 * 2);
}
@Test
public void testSynchronous() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 100;
List<EventingTestBean> persistentReferences = new ArrayList();
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
bus.publish(event);
bus.publish(subEvent);
Assert.assertEquals(i, event.counter.get());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
Assert.assertEquals(i * 2, subEvent.counter.get());
}
}
@Test
public void testConcurrentPublication() {
public void testConcurrentPublication() {
final MBassador bus = new MBassador();
final int listenerCount = 100;
final int concurenny = 20;
final int concurrency = 20;
final CopyOnWriteArrayList<TestEvent> testEvents = new CopyOnWriteArrayList<TestEvent>();
final CopyOnWriteArrayList<SubTestEvent> subtestEvents = new CopyOnWriteArrayList<SubTestEvent>();
final CopyOnWriteArrayList<EventingTestBean> persistentReferences = new CopyOnWriteArrayList<EventingTestBean>();
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for (int i = 0; i < listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
persistentReferences.add(bean);
bus.subscribe(bean);
}
long end = System.currentTimeMillis();
System.out.println("MBassador: Creating " + listenerCount + " listeners took " + (end - start) + " ms");
}
}, concurenny);
}
}
}, concurrency);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < listenerCount; i++) {
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
@ -145,67 +151,20 @@ public class MBassadorTest {
bus.publishAsync(event);
bus.publish(subEvent);
}
long end = System.currentTimeMillis();
System.out.println("MBassador: Publishing " + 2 * listenerCount + " events took " + (end - start) + " ms");
}
}, concurenny);
}, concurrency);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
pause(3000);
for (TestEvent event : testEvents) {
assertEquals(listenerCount * concurrency, event.counter.get());
}
for(TestEvent event : testEvents){
Assert.assertEquals(listenerCount * concurenny, event.counter.get());
for (SubTestEvent event : subtestEvents) {
assertEquals(listenerCount * concurrency * 2, event.counter.get());
}
for(SubTestEvent event : subtestEvents){
Assert.assertEquals(listenerCount * concurenny * 2, event.counter.get());
}
}
public static class TestEvent {
public AtomicInteger counter = new AtomicInteger();
}
public static class SubTestEvent extends TestEvent {
}
public class EventingTestBean {
// every event of type TestEvent or any subtype will be delivered
// to this listener
@Listener
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet();
}
// this handler will be invoked asynchronously
@Listener(priority = 0, dispatch = Mode.Asynchronous)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet();
}
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Listener(
priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(MessageFilter.None.class),@Filter(MessageFilter.All.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
}
}
}
}

View File

@ -0,0 +1,46 @@
package org.mbassy;
import java.util.List;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 11/22/12
*/
public class TestUtil {
public static void setup(final IMessageBus bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;
if(listeners.size() >= numberOfThreads){
partitionSize = (int)Math.floor(listeners.size() / numberOfThreads);
}
else{
partitionSize = 1;
numberOfThreads = listeners.size();
}
for(int i = 0; i < numberOfThreads; i++){
final int partitionStart = i * partitionSize;
final int partitionEnd = (i+1 < numberOfThreads)
? partitionStart + partitionSize + 1
: listeners.size();
setupUnits[i] = new Runnable() {
private List<Object> listenerSubset = listeners.subList(partitionStart, partitionEnd);
public void run() {
for(Object listener : listenerSubset){
bus.subscribe(listener);
}
}
};
}
ConcurrentExecutor.runConcurrent(setupUnits);
}
}

View File

@ -1,10 +1,12 @@
package org.mbassy;
import org.junit.Assert;
import java.lang.ref.WeakReference;
/**
* Created with IntelliJ IDEA.
* User: benni
* @author bennidi
* Date: 11/12/12
* Time: 3:16 PM
* To change this template use File | Settings | File Templates.
@ -31,4 +33,40 @@ public class UnitTest {
System.gc();
}
}
public void fail(String message) {
Assert.fail(message);
}
public void fail() {
Assert.fail();
}
public void assertTrue(Boolean condition) {
Assert.assertTrue(condition);
}
public void assertTrue(String message, Boolean condition) {
Assert.assertTrue(message, condition);
}
public void assertFalse(Boolean condition) {
Assert.assertFalse(condition);
}
public void assertNull(Object object) {
Assert.assertNull(object);
}
public void assertNotNull(Object object) {
Assert.assertNotNull(object);
}
public void assertFalse(String message, Boolean condition) {
Assert.assertFalse(message, condition);
}
public void assertEquals(Object expected, Object actual) {
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,10 @@
package org.mbassy.events;
/**
*
* @author bennidi
* Date: 11/22/12
*/
public class SubTestEvent extends TestEvent {
}

View File

@ -0,0 +1,15 @@
package org.mbassy.events;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*
* @author bennidi
* Date: 11/22/12
*/
public class TestEvent {
public AtomicInteger counter = new AtomicInteger();
}

View File

@ -0,0 +1,42 @@
package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Filter;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MessageFilter;
import org.mbassy.listener.Mode;
/**
* Basic bean that defines some event handlers to be used for different unit testting scenarios
*
* @author bennidi
* Date: 11/22/12
*/
public class EventingTestBean {
// every event of type TestEvent or any subtype will be delivered
// to this listener
@Listener
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet();
}
// this handler will be invoked asynchronously
@Listener(priority = 0, dispatch = Mode.Asynchronous)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet();
}
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Listener(
priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(MessageFilter.None.class), @Filter(MessageFilter.All.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
}
}

View File

@ -0,0 +1,18 @@
package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
/**
* @author bennidi
* Date: 11/22/12
*/
public class EventingTestBean2 extends EventingTestBean{
// redefine the configuration for this handler
@Listener(dispatch = Mode.Synchronous)
public void handleSubTestEvent(SubTestEvent event) {
super.handleSubTestEvent(event);
}
}

View File

@ -0,0 +1,20 @@
package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
/**
* @author bennidi
* Date: 11/22/12
*/
public class EventingTestBean3 extends EventingTestBean2{
// this handler will be invoked asynchronously
@Listener(priority = 0, dispatch = Mode.Synchronous)
public void handleSubTestEventAgain(SubTestEvent event) {
event.counter.incrementAndGet();
}
}

View File

@ -0,0 +1,39 @@
package org.mbassy.listeners;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* This factory will create a list of beans according to some specified configuration.
* It can be used to setup different test scenarios.
*
* @author bennidi
* Date: 11/22/12
*/
public class ListenerFactory {
private Map<Class, Integer> requiredBeans = new HashMap<Class, Integer>();
public ListenerFactory create(int numberOfInstance, Class clazz){
requiredBeans.put(clazz, numberOfInstance);
return this;
}
public List<Object> build() throws Exception{
List<Object> beans = new LinkedList<Object>();
for(Class clazz : requiredBeans.keySet()){
int numberOfRequiredBeans = requiredBeans.get(clazz);
for(int i = 0; i < numberOfRequiredBeans; i++){
beans.add(clazz.newInstance());
}
}
return beans;
}
}

View File

@ -0,0 +1,32 @@
package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
/**
* This bean overrides all the handlers defined in its superclass. Since it does not specify any annotations
* it should be considered a message lister
*
* @author bennidi
* Date: 11/22/12
*/
public class NonListeningBean extends EventingTestBean{
@Override
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet(); // should never be called
}
@Override
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet(); // should never be called
}
@Override
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet(); // should never be called
}
}