fixed #27 #29 #30 #31, higher test coverage, stabilized tests,

This commit is contained in:
bennidi 2013-05-26 17:38:16 +02:00
parent 55d9ad8daf
commit 6100c65718
41 changed files with 1407 additions and 484 deletions

View File

@ -1,5 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@ -43,6 +45,8 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch(Throwable t){
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", null, null, null));
}
}
}
@ -64,7 +68,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPost
}
}
// this method enqueues a message delivery request
// this method queues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(request, timeout, unit)

View File

@ -34,7 +34,6 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory().setBus(this));
this.publicationFactory = configuration.getMessagePublicationFactory();
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
}
protected MessagePublication.Factory getPublicationFactory() {

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PubSubSupport;
import java.util.Collection;
@ -10,32 +11,9 @@ import java.util.Collection;
* @author bennidi
* Date: 3/29/13
*/
public interface ISyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> {
public interface ISyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> extends PubSubSupport<T>{
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
void subscribe(Object listener);
/**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the message listener was still subscribed).
* <p/>
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
* handlers will not have any effect and is silently ignored.
*
* @param listener
* @return true, if the listener was found and successfully removed
* false otherwise
*/
boolean unsubscribe(Object listener);
/**
* @param message
* @return

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.bus;
/**
*
* @author bennidi
* Date: 5/25/13
*/
public interface PostCommand {
}

View File

@ -53,8 +53,8 @@ public class ReflectionUtils {
collectInterfaces(from, superclasses);
while (!from.equals(Object.class) && !from.isInterface()) {
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
collectInterfaces(from, superclasses);
}
return superclasses;
}

View File

@ -8,6 +8,7 @@ import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
/**
@ -58,12 +59,20 @@ public class SubscriptionFactory {
return dispatcher;
}
protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws Exception {
protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws MessageBusException {
Class<? extends HandlerInvocation> invocation = context.getHandlerMetadata().getHandlerInvocation();
if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){
throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class");
}
Constructor<? extends IHandlerInvocation> constructor = invocation.getConstructor(SubscriptionContext.class);
return constructor.newInstance(context);
try {
Constructor<? extends IHandlerInvocation> constructor = invocation.getConstructor(SubscriptionContext.class);
return constructor.newInstance(context);
} catch (NoSuchMethodException e) {
throw new MessageBusException("The provided handler invocation did not specify the necessary constructor "
+ invocation.getSimpleName() + "(SubscriptionContext);", e);
} catch (Exception e) {
throw new MessageBusException("Could not instantiate the provided handler invocation "
+ invocation.getSimpleName(), e);
}
}
}

View File

@ -14,7 +14,7 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
StrongConcurrentSetTest.class,
WeakConcurrentSetTest.class,
AsynchronousMessageBusTest.class,
MBassadorTest.class,
SyncBusTest.MBassadorTest.class,
SyncBusTest.SyncMessageBusTest.class,
FilterTest.class,

View File

@ -1,113 +0,0 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.messages.TestMessage2;
import net.engio.mbassy.listeners.*;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
* @author bennidi
* Date: 2/8/12
*/
public class AsynchronousMessageBusTest extends MessageBusTest {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
// evaluation is run
private int processingTimeInMS = 4000;
@Test
public void testAsynchronousMessagePublication() throws Exception {
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
.create(10000, EventingTestBean3.class)
.create(10000, Object.class)
.create(10000, NonListeningBean.class)
.create(10000, MultiEventHandler.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
TestMessage2 message2 = new TestMessage2();
bus.publishAsync(message);
bus.publishAsync(subMessage);
bus.publishAsync(message2);
pause(processingTimeInMS);
assertEquals(50000, message.counter.get());
assertEquals(80000, subMessage.counter.get());
assertEquals(20000, message2.counter.get());
}
@Test
public void testConcurrentMixedMessagePublication() throws Exception {
final CopyOnWriteArrayList<TestMessage> testMessages = new CopyOnWriteArrayList<TestMessage>();
final CopyOnWriteArrayList<SubTestMessage> subtestMessages = new CopyOnWriteArrayList<SubTestMessage>();
final int eventLoopsPerTHread = 100;
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
.create(10000, EventingTestBean3.class)
.create(10000, Object.class)
.create(10000, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for (int i = 0; i < eventLoopsPerTHread; i++) {
TestMessage message = new TestMessage();
SubTestMessage subMessage = new SubTestMessage();
testMessages.add(message);
subtestMessages.add(subMessage);
bus.publishAsync(message);
bus.publish(subMessage);
}
}
}, 10);
pause(processingTimeInMS);
for (TestMessage message : testMessages) {
assertEquals(30000, message.counter.get());
}
for (SubTestMessage message : subtestMessages) {
assertEquals(70000, message.counter.get());
}
}
}

View File

@ -1,9 +1,9 @@
package net.engio.mbassy;
import junit.framework.Assert;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.common.UnitTest;
import org.junit.Test;
import java.util.*;
@ -20,7 +20,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* @author bennidi
* Date: 11/12/12
*/
public abstract class ConcurrentSetTest extends UnitTest {
public abstract class ConcurrentSetTest extends AssertSupport {
// Shared state
protected final int numberOfElements = 100000;
@ -38,7 +38,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
final IConcurrentSet testSetWeak = createSet();
Random rand = new Random();
// build set of distinct objects and list of duplicates
// getAll set of distinct objects and list of duplicates
Object candidate = new Object();
for (int i = 0; i < numberOfElements; i++) {
if (rand.nextInt() % 3 == 0) {
@ -72,7 +72,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
// build set of distinct objects and mark a subset of those for removal
// getAll set of distinct objects and mark a subset of those for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
@ -81,7 +81,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
}
}
// build the test set from the set of candidates
// getAll the test set from the set of candidates
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
@ -119,7 +119,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
// build set of candidates and mark subset for removal
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
@ -128,7 +128,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
}
}
// build test set by adding the candidates
// getAll test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
@ -158,14 +158,14 @@ public abstract class ConcurrentSetTest extends UnitTest {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
// build set of candidates and mark subset for removal
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSetWeak.add(candidate);
}
// build test set by adding the candidates
// getAll test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
@ -190,14 +190,14 @@ public abstract class ConcurrentSetTest extends UnitTest {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet setUnderTest = createSet();
// build set of candidates and mark subset for removal
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
setUnderTest.add(candidate);
}
// build test set by adding the candidates
// getAll test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override

View File

@ -35,7 +35,7 @@ public class FilterTest extends MessageBusTest {
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.build();
List<Object> listeners = listenerFactory.getAll();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
@ -60,7 +60,7 @@ public class FilterTest extends MessageBusTest {
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.build();
List<Object> listeners = listenerFactory.getAll();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);

View File

@ -0,0 +1,166 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.listeners.ListenerFactory;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
* @author bennidi
* Date: 2/8/12
*/
public class MBassadorTest extends MessageBusTest {
@Test
public void testSynchronousMessagePublication() throws Exception {
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.AsyncListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, Object.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
pause(processingTimeInMS);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class));
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class));
}
@Test
public void testAsynchronousMessagePublication() throws Exception {
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.AsyncListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, Object.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(standardMessage).asynchronously();
bus.post(multipartMessage).asynchronously();
bus.post(MessageTypes.Simple).asynchronously();
pause(processingTimeInMS);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class));
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class));
}
@Test
public void testExceptionInHandlerInvocation(){
final AtomicInteger exceptionCount = new AtomicInteger(0);
IPublicationErrorHandler ExceptionCounter = new IPublicationErrorHandler() {
@Override
public void handleError(PublicationError error) {
exceptionCount.incrementAndGet();
}
};
final MBassador bus = new MBassador(new BusConfiguration());
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
bus.post(new StandardMessage()).asynchronously();
}
};
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
pause(processingTimeInMS);
assertEquals(InstancesPerListener, exceptionCount.get());
// multi threaded
exceptionCount.set(0);
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
pause(processingTimeInMS);
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy;
import net.engio.mbassy.common.AssertSupport;
import org.junit.Test;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MessageListenerMetadata;
@ -16,12 +16,11 @@ import java.util.Map;
import static net.engio.mbassy.listener.MessageListenerMetadata.ForMessage;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 12/16/12
*/
public class MetadataReaderTest extends UnitTest {
public class MetadataReaderTest extends AssertSupport {
private MetadataReader reader = new MetadataReader();

View File

@ -1,167 +1,209 @@
package net.engio.mbassy;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.IPredicate;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.common.SubscriptionValidator;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.messages.ITestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.messages.*;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import org.junit.Test;
import java.util.*;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/12/13
*/
public class SubscriptionManagerTest extends UnitTest{
public class SubscriptionManagerTest extends AssertSupport {
private static final int InstancesPerListener = 5000;
private static final int ConcurrentUnits = 10;
@Test
public void testSimpleSynchronousHandler(){
final SubscriptionManager subMan = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
final Set listeners = Collections.synchronizedSet(new HashSet());
final int concurrentUnits = 5;
final int numberOfLoops = 100;
final int numberOfListeners = numberOfLoops * concurrentUnits;
public void testIMessageListener(){
ListenerFactory listeners = listeners(
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
IMessageListener.DisabledListener.class,
IMessageListener.NoSubtypesListener.class);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(int i = 0 ; i < numberOfLoops ; i++){
SimpleSynchronousMessageHandler
listener1 = new SimpleSynchronousMessageHandler();
SimpleSynchronousMessageHandler2 listener2 = new SimpleSynchronousMessageHandler2();
subMan.subscribe(listener1);
subMan.subscribe(listener2);
listeners.add(listener1);
listeners.add(listener2);
}
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMessageListener.DefaultListener.class).handles(IMessage.class,
AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.AsyncListener.class).handles(IMessage.class,
AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
}
}, concurrentUnits);
runTestWith(listeners, expectedSubscriptions);
}
SubscriptionValidator validator = new SubscriptionValidator();
validator.expect(numberOfListeners, SimpleSynchronousMessageHandler.class, ITestMessage.class);
validator.expect(numberOfListeners, SimpleSynchronousMessageHandler2.class, ITestMessage.class);
validator.expect(numberOfListeners, SimpleSynchronousMessageHandler.class, TestMessage.class);
validator.expect(numberOfListeners, SimpleSynchronousMessageHandler2.class, TestMessage.class);
@Test
public void testAbstractMessageListener(){
ListenerFactory listeners = listeners(
AbstractMessageListener.DefaultListener.class,
AbstractMessageListener.AsyncListener.class,
AbstractMessageListener.DisabledListener.class,
AbstractMessageListener.NoSubtypesListener.class);
validator.validate(subMan);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class)
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class)
.listener(AbstractMessageListener.AsyncListener.class).handles(StandardMessage.class, AbstractMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMessagesListener(){
ListenerFactory listeners = listeners(
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.DisabledListener.class,
MessagesListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class)
.listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMultipartMessageListener(){
ListenerFactory listeners = listeners(
MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.AsyncListener.class,
MultipartMessageListener.DisabledListener.class,
MultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testIMultipartMessageListener(){
ListenerFactory listeners = listeners(
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.DisabledListener.class,
IMultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testStandardMessageListener(){
ListenerFactory listeners = listeners(
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.DisabledListener.class,
StandardMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.AsyncListener.class).handles(StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testICountableListener(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
ICountableListener.DisabledListener.class,
ICountableListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class).handles(ICountable.class)
.listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(ICountableListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testMultipleMessageListeners(){
ListenerFactory listeners = listeners(
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
ICountableListener.DisabledListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.DisabledListener.class,
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.DisabledListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(ICountableListener.AsyncListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class)
.listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test
public void testStrongListenerSubscription() throws Exception {
ListenerFactory listeners = listeners(CustomInvocationListener.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
listeners.clear();
runGC();
Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(StandardMessage.class);
assertEquals(1, subscriptions.size());
for(Subscription sub : subscriptions)
assertEquals(InstancesPerListener, sub.size());
}
class SubscriptionValidator{
private List<Entry> validations = new LinkedList<Entry>();
private Set<Class> messageTypes = new HashSet<Class>();
private Set<Class> subsribers = new HashSet<Class>();
public SubscriptionValidator expect(int numberOfSubscriber, Class subscriber, Class messageType){
validations.add(new Entry(messageType, numberOfSubscriber, subscriber));
messageTypes.add(messageType);
subsribers.add(subscriber);
return this;
private ListenerFactory listeners(Class ...listeners){
ListenerFactory factory = new ListenerFactory();
for(Class listener : listeners){
factory.create(InstancesPerListener, listener);
}
return factory;
}
public void validate(SubscriptionManager manager){
for(Class messageType : messageTypes){
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
Collection<Entry> validationEntries = getEntries(EntriesByMessageType(messageType));
assertEquals(subscriptions.size(), validationEntries.size());
for(Entry validationEntry : validationEntries){
Subscription matchingSub = null;
// one of the subscriptions must belong to the subscriber type
for(Subscription sub : subscriptions){
if(sub.belongsTo(validationEntry.subscriber)){
matchingSub = sub;
break;
}
}
assertNotNull(matchingSub);
assertEquals(validationEntry.numberOfSubscribers, matchingSub.size());
}
}
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
private Collection<Entry> getEntries(IPredicate<Entry> filter){
Collection<Entry> matching = new LinkedList<Entry>();
for (Entry validationEntry : validations){
if(filter.apply(validationEntry))matching.add(validationEntry);
}
return matching;
}
validator.validate(subscriptionManager);
private IPredicate<Entry> EntriesByMessageType(final Class messageType){
return new IPredicate<Entry>() {
@Override
public boolean apply(Entry target) {
return target.messageType.equals(messageType);
}
};
}
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), ConcurrentUnits);
private IPredicate<Entry> EntriesBySubscriberType(final Class subscriberType){
return new IPredicate<Entry>() {
@Override
public boolean apply(Entry target) {
return target.subscriber.equals(subscriberType);
}
};
}
private class Entry{
private int numberOfSubscribers;
private Class subscriber;
private Class messageType;
private Entry(Class messageType, int numberOfSubscribers, Class subscriber) {
this.messageType = messageType;
this.numberOfSubscribers = numberOfSubscribers;
this.subscriber = subscriber;
}
}
listeners.clear();
validator.validate(subscriptionManager);
}
static class SimpleSynchronousMessageHandler{
@Handler
public void handle(TestMessage message) {
}
@Handler
public void handle(ITestMessage message) {
}
}
static class SimpleSynchronousMessageHandler2{
@Handler
public void handle(TestMessage message) {
}
@Handler
public void handle(ITestMessage message) {
}
}
}

View File

@ -1,21 +1,16 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.*;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.messages.ITestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.messages.TestMessage3;
import net.engio.mbassy.subscription.SubscriptionContext;
import org.junit.Assert;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
@ -25,152 +20,117 @@ import java.util.List;
*/
public abstract class SyncBusTest extends MessageBusTest {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
// evaluation is run
private int processingTimeInMS = 4000;
protected abstract ISyncMessageBus getSyncMessageBus();
@Test
public void testSynchronousMessagePublication() throws Exception {
ISyncMessageBus bus = getSyncMessageBus();
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, MessageListener1.class)
.create(10000, MessageListener2.class)
.create(10000, MessageListener3.class)
.create(10000, Object.class)
.create(10000, NonListeningBean.class);
final ISyncMessageBus bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, Object.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
bus.post(message).now();
bus.post(subMessage).now();
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
bus.post(MessageTypes.Multipart).now();
pause(processingTimeInMS);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
}
};
assertEquals(30000, message.counter.get());
assertEquals(70000, subMessage.counter.get());
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
// multi threaded
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(MessagesListener.DefaultListener.class));
}
@Test
public void testStrongListenerSubscription() throws Exception {
ISyncMessageBus bus = getSyncMessageBus();
for(int i = 0; i< 10000; i++){
bus.subscribe(new MessageListener2());
}
runGC();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.post(message).now();
bus.post(subMessage).now();
pause(processingTimeInMS);
assertEquals(10000, message.counter.get());
assertEquals(20000, subMessage.counter.get());
}
protected abstract ISyncMessageBus getSyncMessageBus();
@Test
public void testHandlerUsingInterface() {
MBassador<ITestMessage> bus = new MBassador<ITestMessage>(BusConfiguration.Default());
bus.subscribe(new InterfaceMessageListener());
bus.publish(new TestMessage3());
public void testExceptionInHandlerInvocation(){
final AtomicInteger exceptionCount = new AtomicInteger(0);
IPublicationErrorHandler ExceptionCounter = new IPublicationErrorHandler() {
@Override
public void handleError(PublicationError error) {
exceptionCount.incrementAndGet();
}
};
final ISyncMessageBus bus = getSyncMessageBus();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publish = new Runnable() {
@Override
public void run() {
bus.post(new StandardMessage()).now();
}
};
// single threaded
ConcurrentExecutor.runConcurrent(publish, 1);
exceptionCount.set(0);
// multi threaded
ConcurrentExecutor.runConcurrent(publish, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
}
@Listener(references = References.Strong)
static class InterfaceMessageListener{
@Handler
public void handleFoo(ITestMessage f) {
Assert.assertTrue(f instanceof TestMessage3);
}
@Handler
public void handleDead(DeadMessage d) {
Assert.fail("This class should handle this message appropriately!");
}
}
@Test
public void testCustomHandlerInvocation(){
final ISyncMessageBus bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, CustomInvocationListener.class)
.create(InstancesPerListener, Object.class);
public static class MessageListener1 {
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
// every event of type TestEvent or any subtype will be delivered
// to this listener
@Handler
public void handleTestEvent(TestMessage message) {
message.counter.incrementAndGet();
}
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
// this handler will be invoked asynchronously
@Handler(priority = 0, invocation = HandleSubTestEventInvocation.class)
public void handleSubTestEvent(SubTestMessage message) {
message.counter.incrementAndGet();
}
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Handler(
priority = 10,
delivery = Invoke.Synchronously,
filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)})
public void handleFiltered(SubTestMessage message) {
message.counter.incrementAndGet();
}
assertEquals(InstancesPerListener * 2, standardMessage.getTimesHandled(CustomInvocationListener.class));
assertEquals(0, multipartMessage.getTimesHandled(CustomInvocationListener.class));
assertEquals(0, MessageTypes.Simple.getTimesHandled(CustomInvocationListener.class));
}
};
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
}
public static class HandleSubTestEventInvocation extends HandlerInvocation<MessageListener1, SubTestMessage> {
public HandleSubTestEventInvocation(SubscriptionContext context) {
super(context);
}
@Override
public void invoke(MessageListener1 listener, SubTestMessage message) {
listener.handleSubTestEvent(message);
}
}
@Listener(references = References.Strong)
public static class MessageListener2 extends net.engio.mbassy.listeners.EventingTestBean {
// redefine the configuration for this handler
@Handler(delivery = Invoke.Synchronously)
public void handleSubTestEvent(SubTestMessage message) {
super.handleSubTestEvent(message);
}
}
@Listener(references = References.Strong)
public static class MessageListener3 extends net.engio.mbassy.listeners.EventingTestBean2 {
// this handler will be invoked asynchronously
@Handler(priority = 0, delivery = Invoke.Synchronously)
public void handleSubTestEventAgain(SubTestMessage message) {
message.counter.incrementAndGet();
}
// multi threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
}

View File

@ -21,17 +21,17 @@ import java.util.List;
public class SynchronizedHandlerTest extends MessageBusTest {
private static int incrementsPerHandler = 10000;
private static int incrementsPerMessage = 10000;
private static int numberOfMessages = 1000;
private static int numberOfHandlers = 1000;
private static int numberOfListeners = 1000;
@Test
public void testSynchronizedWithSynchronousInvocation(){
List<SynchronizedMessageHandlerSync> handlers = new LinkedList<SynchronizedMessageHandlerSync>();
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
IMessageBus bus = getBus(BusConfiguration.Default()
.setNumberOfMessageDispatchers(6));
for(int i = 0; i < numberOfHandlers; i++){
SynchronizedMessageHandlerSync handler = new SynchronizedMessageHandlerSync();
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
handlers.add(handler);
bus.subscribe(handler);
}
@ -40,23 +40,24 @@ public class SynchronizedHandlerTest extends MessageBusTest {
for(int i = 0; i < numberOfMessages; i++){
publication = bus.post(new Object()).asynchronously();
}
// wait for last publication
while (!publication.isFinished()){
pause(2000);
pause(100);
}
for(SynchronizedMessageHandlerSync handler : handlers){
assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter);
for(SynchronizedWithSynchronousDelivery handler : handlers){
assertEquals(incrementsPerMessage * numberOfMessages, handler.counter);
}
}
@Test
public void testSynchronizedWithAsSynchronousInvocation(){
List<SynchronizedMessageHandlerAsyn> handlers = new LinkedList<SynchronizedMessageHandlerAsyn>();
List<SynchronizedWithAsynchronousDelivery> handlers = new LinkedList<SynchronizedWithAsynchronousDelivery>();
IMessageBus bus = getBus(BusConfiguration.Default()
.setNumberOfMessageDispatchers(6));
for(int i = 0; i < numberOfHandlers; i++){
SynchronizedMessageHandlerAsyn handler = new SynchronizedMessageHandlerAsyn();
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithAsynchronousDelivery handler = new SynchronizedWithAsynchronousDelivery();
handlers.add(handler);
bus.subscribe(handler);
}
@ -67,35 +68,35 @@ public class SynchronizedHandlerTest extends MessageBusTest {
pause(10000);
for(SynchronizedMessageHandlerAsyn handler : handlers){
assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter);
for(SynchronizedWithAsynchronousDelivery handler : handlers){
assertEquals(incrementsPerMessage * numberOfMessages, handler.counter);
}
}
public static class SynchronizedMessageHandlerSync{
public static class SynchronizedWithSynchronousDelivery {
private int Counter = 0;
private int counter = 0;
@Handler
@Synchronized
public void handleMessage(Object o){
for(int i = 0; i < incrementsPerHandler; i++){
Counter++;
for(int i = 0; i < incrementsPerMessage; i++){
counter++;
}
}
}
public static class SynchronizedMessageHandlerAsyn{
public static class SynchronizedWithAsynchronousDelivery {
private int Counter = 0;
private int counter = 0;
@Handler(delivery = Invoke.Asynchronously)
@Synchronized
public void handleMessage(Object o){
for(int i = 0; i < incrementsPerHandler; i++){
Counter++;
for(int i = 0; i < incrementsPerMessage; i++){
counter++;
}
}

View File

@ -90,7 +90,7 @@ public class ListenerSubscriptionTest extends MessageBusTest{
.create(10000, Object.class)
.create(10000, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
List<Object> listeners = listenerFactory.getAll();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);

View File

@ -5,13 +5,9 @@ import org.junit.Assert;
import java.lang.ref.WeakReference;
/**
* Created with IntelliJ IDEA.
* @author bennidi
* Date: 11/12/12
* Time: 3:16 PM
* To change this template use File | Settings | File Templates.
*/
public class UnitTest {
public class AssertSupport {
// Internal state
private Runtime runtime = Runtime.getRuntime();
@ -31,7 +27,9 @@ public class UnitTest {
public void runGC() {
WeakReference ref = new WeakReference<Object>(new Object());
pause(100);
while(ref.get() != null) {
pause(10);
runtime.gc();
}
}

View File

@ -43,6 +43,7 @@ public class ConcurrentExecutor {
returnValues.add(executor.submit(wrapper));
}
// wait until all tasks have been executed
try {
executor.shutdown();// tells the thread pool to execute all waiting tasks
@ -51,6 +52,15 @@ public class ConcurrentExecutor {
// unlikely that this will happen
e.printStackTrace();
}
for(Future task : returnValues){
try {
task.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -6,6 +6,8 @@ import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.messages.MessageTypes;
import org.junit.Before;
/**
* A base test that provides a factory for message bus that makes tests fail if any
@ -14,7 +16,14 @@ import net.engio.mbassy.bus.MBassador;
* @author bennidi
* Date: 3/2/13
*/
public class MessageBusTest<Bus extends ISyncMessageBus> extends UnitTest {
public abstract class MessageBusTest<Bus extends ISyncMessageBus> extends AssertSupport {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
// evaluation is run
protected static final int processingTimeInMS = 6000;
protected static final int InstancesPerListener = 5000;
protected static final int ConcurrentUnits = 10;
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override
@ -23,6 +32,13 @@ public class MessageBusTest<Bus extends ISyncMessageBus> extends UnitTest {
}
};
@Before
public void setUp(){
for(MessageTypes mes : MessageTypes.values())
mes.reset();
}
public MBassador getBus(BusConfiguration configuration) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);

View File

@ -0,0 +1,117 @@
package net.engio.mbassy.common;
import net.engio.mbassy.listeners.ListenerFactory;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.*;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/25/13
*/
public class SubscriptionValidator extends AssertSupport{
private List<ValidationEntry> validations = new LinkedList<ValidationEntry>();
private Set<Class> messageTypes = new HashSet<Class>();
private Set<Class> subscribers = new HashSet<Class>();
private ListenerFactory subscribedListener;
public SubscriptionValidator(ListenerFactory subscribedListener) {
this.subscribedListener = subscribedListener;
}
public Expectation listener(Class subscriber){
return new Expectation(subscriber);
}
private SubscriptionValidator expect(Class subscriber, Class messageType){
validations.add(new ValidationEntry(messageType, subscriber));
messageTypes.add(messageType);
subscribers.add(subscriber);
return this;
}
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager){
for(Class messageType : messageTypes){
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
Collection<ValidationEntry> validationEntries = getEntries(EntriesByMessageType(messageType));
assertEquals(subscriptions.size(), validationEntries.size());
for(ValidationEntry validationValidationEntry : validationEntries){
Subscription matchingSub = null;
// one of the subscriptions must belong to the subscriber type
for(Subscription sub : subscriptions){
if(sub.belongsTo(validationValidationEntry.subscriber)){
matchingSub = sub;
break;
}
}
assertNotNull(matchingSub);
assertEquals(subscribedListener.getNumberOfListeners(validationValidationEntry.subscriber), matchingSub.size());
}
}
}
private Collection<ValidationEntry> getEntries(IPredicate<ValidationEntry> filter){
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
for (ValidationEntry validationValidationEntry : validations){
if(filter.apply(validationValidationEntry))matching.add(validationValidationEntry);
}
return matching;
}
private IPredicate<ValidationEntry> EntriesByMessageType(final Class messageType){
return new IPredicate<ValidationEntry>() {
@Override
public boolean apply(ValidationEntry target) {
return target.messageType.equals(messageType);
}
};
}
private IPredicate<ValidationEntry> EntriesBySubscriberType(final Class subscriberType){
return new IPredicate<ValidationEntry>() {
@Override
public boolean apply(ValidationEntry target) {
return target.subscriber.equals(subscriberType);
}
};
}
public class Expectation{
private Class listener;
private Expectation(Class listener) {
this.listener = listener;
}
public SubscriptionValidator handles(Class ...messages){
for(Class message : messages)
expect(listener, message);
return SubscriptionValidator.this;
}
}
private class ValidationEntry {
private Class subscriber;
private Class messageType;
private ValidationEntry(Class messageType, Class subscriber) {
this.messageType = messageType;
this.subscriber = subscriber;
}
}
}

View File

@ -1,8 +1,10 @@
package net.engio.mbassy.common;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.listeners.ListenerFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.Iterator;
import java.util.List;
/**
@ -14,6 +16,58 @@ import java.util.List;
public class TestUtil {
public static Runnable subscriber(final SubscriptionManager manager, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
Object next;
while((next = source.next()) != null){
manager.subscribe(next);
}
}
};
}
public static Runnable unsubscriber(final SubscriptionManager manager, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
Object next;
while((next = source.next()) != null){
manager.unsubscribe(next);
}
}
};
}
public static Runnable subscriber(final ISyncMessageBus bus, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
Object next;
while((next = source.next()) != null){
bus.subscribe(next);
}
}
};
}
public static Runnable unsubscriber(final ISyncMessageBus bus, final ListenerFactory listeners){
final Iterator source = listeners.iterator();
return new Runnable() {
@Override
public void run() {
Object next;
while((next = source.next()) != null){
bus.unsubscribe(next);
}
}
};
}
public static void setup(final ISyncMessageBus bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.AbstractMessage;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class AbstractMessageListener {
private static abstract class BaseListener {
@Handler
public void handle(AbstractMessage message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(AbstractMessage message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(AbstractMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(AbstractMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(AbstractMessage message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,37 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.StandardMessage;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* @author bennidi
* Date: 5/25/13
*/
@Listener(references = References.Strong)
public class CustomInvocationListener {
// this handler will be invoked asynchronously
@Handler(invocation = HandleSubTestEventInvocation.class)
public void handle(StandardMessage message) {
message.handled(this.getClass());
message.handled(this.getClass());
}
public static class HandleSubTestEventInvocation extends HandlerInvocation<CustomInvocationListener, StandardMessage> {
public HandleSubTestEventInvocation(SubscriptionContext context) {
super(context);
}
@Override
public void invoke(CustomInvocationListener listener, StandardMessage message) {
listener.handle(message);
}
}
}

View File

@ -0,0 +1,23 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.StandardMessage;
/**
* @author bennidi
* Date: 5/25/13
*/
@Listener(references = References.Strong)
public class ExceptionThrowingListener {
// this handler will be invoked asynchronously
@Handler()
public void handle(StandardMessage message) {
throw new RuntimeException("This is an expected exception");
}
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.ICountable;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class ICountableListener {
private static abstract class BaseListener {
@Handler
public void handle(ICountable message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(ICountable message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(ICountable message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(ICountable message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(ICountable message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.IMessage;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class IMessageListener {
private static abstract class BaseListener {
@Handler
public void handle(IMessage message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(IMessage message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(IMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(IMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(IMessage message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.IMultipartMessage;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class IMultipartMessageListener {
private static abstract class BaseListener {
@Handler
public void handle(IMultipartMessage message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(IMultipartMessage message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(IMultipartMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(IMultipartMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(IMultipartMessage message){
super.handle(message);
}
}
}

View File

@ -1,13 +1,13 @@
package net.engio.mbassy.listeners;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This factory will create a list of beans according to some specified configuration.
* It can be used to setup different test scenarios.
* The factory can be used to declaratively specify how many instances of some given classes
* should be created. It will create those instances using reflection and provide a list containing those instances.
* The factory also holds strong references to the instances such that GC will not interfere with tests unless the
* factory is explicitly cleared.
*
* @author bennidi
* Date: 11/22/12
@ -15,24 +15,68 @@ import java.util.Map;
public class ListenerFactory {
private Map<Class, Integer> requiredBeans = new HashMap<Class, Integer>();
private List generatedListeners;
private int requiredSize = 0;
public int getNumberOfListeners(Class listener){
return requiredBeans.containsKey(listener) ? requiredBeans.get(listener) : 0;
}
public ListenerFactory create(int numberOfInstance, Class clazz){
requiredBeans.put(clazz, numberOfInstance);
public ListenerFactory create(int numberOfInstances, Class clazz){
requiredBeans.put(clazz, numberOfInstances);
requiredSize +=numberOfInstances;
return this;
}
public List<Object> build() throws Exception{
List<Object> beans = new LinkedList<Object>();
for(Class clazz : requiredBeans.keySet()){
int numberOfRequiredBeans = requiredBeans.get(clazz);
for(int i = 0; i < numberOfRequiredBeans; i++){
beans.add(clazz.newInstance());
public List<Object> getAll(){
generatedListeners = new ArrayList(requiredSize);
try {
for(Class clazz : requiredBeans.keySet()){
int numberOfRequiredBeans = requiredBeans.get(clazz);
for(int i = 0; i < numberOfRequiredBeans; i++){
generatedListeners.add(clazz.newInstance());
}
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return beans;
Collections.shuffle(generatedListeners);
return generatedListeners;
}
// not thread-safe but not yet used concurrently
public void clear(){
generatedListeners = null;
requiredBeans.clear();
}
/**
* Create a thread-safe read-only iterator
* @return
*/
public Iterator iterator(){
if(generatedListeners == null)getAll();
final AtomicInteger current = new AtomicInteger(0);
return new Iterator() {
@Override
public boolean hasNext() {
return current.get() < generatedListeners.size();
}
@Override
public Object next() {
int index = current.getAndIncrement();
return index < generatedListeners.size() ? generatedListeners.get(index) : null;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Iterator is read only");
}
};
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.MessageTypes;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class MessagesListener {
private static abstract class BaseListener {
@Handler
public void handle(MessageTypes message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(MessageTypes message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(MessageTypes message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(MessageTypes message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(MessageTypes message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.MultipartMessage;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class MultipartMessageListener {
private static abstract class BaseListener {
@Handler
public void handle(MultipartMessage message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(MultipartMessage message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(MultipartMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(MultipartMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(MultipartMessage message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,58 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.messages.StandardMessage;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class StandardMessageListener {
private static abstract class BaseListener {
@Handler
public void handle(StandardMessage message){
message.handled(this.getClass());
}
}
public static class DefaultListener extends BaseListener {
public void handle(StandardMessage message){
super.handle(message);
}
}
public static class NoSubtypesListener extends BaseListener {
@Handler(rejectSubtypes = true)
public void handle(StandardMessage message){
super.handle(message);
}
}
public static class AsyncListener extends BaseListener {
@Handler(delivery = Invoke.Asynchronously)
public void handle(StandardMessage message){
super.handle(message);
}
}
public static class DisabledListener extends BaseListener {
@Handler(enabled = false)
public void handle(StandardMessage message){
super.handle(message);
}
}
}

View File

@ -0,0 +1,55 @@
package net.engio.mbassy.messages;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public abstract class AbstractMessage implements IMessage{
private Map<Class, Integer> handledByListener = new HashMap<Class, Integer>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void reset() {
lock.writeLock().lock();
try {
handledByListener.clear();
}finally {
lock.writeLock().unlock();
}
}
@Override
public void handled(Class listener) {
lock.writeLock().lock();
try {
Integer count = handledByListener.get(listener);
if(count == null){
handledByListener.put(listener, 1);
}
else{
handledByListener.put(listener, count + 1);
}
}finally {
lock.writeLock().unlock();
}
}
@Override
public int getTimesHandled(Class listener) {
lock.readLock().lock();
try {
return handledByListener.containsKey(listener)
? handledByListener.get(listener)
: 0;
}finally {
lock.readLock().unlock();
}
}
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.messages;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class CountableMessage extends AbstractMessage implements ICountable{
}

View File

@ -0,0 +1,16 @@
package net.engio.mbassy.messages;
/**
* Interface analogous to IMessage. Exists to test more complex class/interface hierarchies
*
* @author bennidi
* Date: 5/24/13
*/
public interface ICountable {
void reset();
void handled(Class listener);
int getTimesHandled(Class listener);
}

View File

@ -0,0 +1,16 @@
package net.engio.mbassy.messages;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public interface IMessage {
void reset();
void handled(Class listener);
int getTimesHandled(Class listener);
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.messages;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public interface IMultipartMessage extends IMessage, ICountable{
}

View File

@ -1,10 +0,0 @@
package net.engio.mbassy.messages;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/12/13
*/
public interface ITestMessage {
}

View File

@ -0,0 +1,62 @@
package net.engio.mbassy.messages;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Enum used to test handlers that consume enumerations.
*
* @author bennidi
* Date: 5/24/13
*/
public enum MessageTypes implements IMessage{
Simple,Persistent,Multipart;
public static void resetAll(){
for(MessageTypes m : values())
m.reset();
}
private Map<Class, Integer> handledByListener = new HashMap<Class, Integer>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void reset() {
lock.writeLock().lock();
try {
handledByListener.clear();
}finally {
lock.writeLock().unlock();
}
}
@Override
public void handled(Class listener) {
lock.writeLock().lock();
try {
Integer count = handledByListener.get(listener);
if(count == null){
handledByListener.put(listener, 1);
}
else{
handledByListener.put(listener, count + 1);
}
}finally {
lock.writeLock().unlock();
}
}
@Override
public int getTimesHandled(Class listener) {
lock.readLock().lock();
try {
return handledByListener.containsKey(listener)
? handledByListener.get(listener)
: 0;
}finally {
lock.readLock().unlock();
}
}
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.messages;
/**
*
* @author bennidi
* Date: 5/24/13
*/
public class MultipartMessage extends AbstractMessage implements IMultipartMessage, ICountable{
}

View File

@ -0,0 +1,8 @@
package net.engio.mbassy.messages;
/**
* @author bennidi
* Date: 5/24/13
*/
public class StandardMessage extends AbstractMessage implements ICountable{
}

View File

@ -1,10 +0,0 @@
package net.engio.mbassy.messages;
/**
* A test message that uses an interface
*
* @author durron597
*/
public class TestMessage3 implements ITestMessage {
}