split message publication test, introduced message manager for easy synchronization of async messages

This commit is contained in:
benjamin 2013-06-26 16:23:05 +02:00
parent 40109de6ee
commit 1ecf5ebef8
6 changed files with 339 additions and 54 deletions

View File

@ -2,13 +2,8 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.listeners.ExceptionThrowingListener;
import net.engio.mbassy.listeners.IMessageListener;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.listeners.MessagesListener;
import net.engio.mbassy.common.*;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.messages.MessageTypes;
import net.engio.mbassy.messages.MultipartMessage;
import net.engio.mbassy.messages.StandardMessage;
@ -26,21 +21,14 @@ public class MBassadorTest extends MessageBusTest {
@Test
public void testSynchronousMessagePublication() throws Exception {
public void testSyncPublicationSyncHandlers() throws Exception {
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.AsyncListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, Object.class);
.create(InstancesPerListener, Listeners.synchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
@ -51,74 +39,84 @@ public class MBassadorTest extends MessageBusTest {
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
pause(processingTimeInMS);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class));
}
};
// test single-threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
// test multi-threaded
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
}
@Test
public void testSyncPublicationAsyncHandlers() throws Exception {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = messageManager.create(StandardMessage.class, InstancesPerListener, Listeners.join(Listeners.asynchronous(), Listeners.handlesStandardMessage()));
MultipartMessage multipartMessage = messageManager.create(MultipartMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class, IMultipartMessageListener.AsyncListener.class);
bus.post(standardMessage).now();
bus.post(multipartMessage).now();
bus.post(MessageTypes.Simple).now();
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
messageManager.waitForMessages(processingTimeInMS);
MessageTypes.resetAll();
messageManager.register(MessageTypes.Simple, InstancesPerListener * ConcurrentUnits, IMessageListener.AsyncListener.class, MessagesListener.AsyncListener.class);
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class));
messageManager.waitForMessages(processingTimeInMS);
}
@Test
public void testAsynchronousMessagePublication() throws Exception {
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
.create(InstancesPerListener, MessagesListener.DefaultListener.class)
.create(InstancesPerListener, MessagesListener.AsyncListener.class)
.create(InstancesPerListener, MessagesListener.DisabledListener.class)
.create(InstancesPerListener, Object.class);
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(new BusConfiguration(), listeners);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
StandardMessage standardMessage = new StandardMessage();
MultipartMessage multipartMessage = new MultipartMessage();
StandardMessage standardMessage = messageManager.create(StandardMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class);
MultipartMessage multipartMessage = messageManager.create(MultipartMessage.class, InstancesPerListener, IMessageListener.AsyncListener.class);
bus.post(standardMessage).asynchronously();
bus.post(multipartMessage).asynchronously();
bus.post(MessageTypes.Simple).asynchronously();
pause(processingTimeInMS);
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class));
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
messageManager.waitForMessages(processingTimeInMS);
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class));
messageManager.waitForMessages(processingTimeInMS);
}

View File

@ -30,6 +30,18 @@ public class ListenerFactory {
return this;
}
public ListenerFactory create(int numberOfInstances, Class[] classes){
for(Class clazz : classes)
create(numberOfInstances,clazz);
return this;
}
public ListenerFactory create(int numberOfInstances, Collection<Class> classes){
for(Class clazz : classes)
create(numberOfInstances,clazz);
return this;
}
public synchronized List<Object> getAll(){
if(generatedListeners != null)

View File

@ -46,4 +46,11 @@ public abstract class MessageBusTest<Bus extends ISyncMessageBus> extends Assert
return bus;
}
public MBassador getBus(BusConfiguration configuration, ListenerFactory listeners) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
return bus;
}
}

View File

@ -0,0 +1,156 @@
package net.engio.mbassy.common;
import net.engio.mbassy.messages.IMessage;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 6/26/13
* Time: 12:23 PM
* To change this template use File | Settings | File Templates.
*/
public class MessageManager {
private StrongConcurrentSet<MessageContext> messages = new StrongConcurrentSet();
public <T extends IMessage> T create(Class<T> messageType, int expectedCount, Class ...listeners){
T message;
try {
message = messageType.newInstance();
register(message, expectedCount, listeners);
} catch (Exception e) {
throw new RuntimeException(e);
}
return message;
}
public <T extends IMessage> T create(Class<T> messageType, int expectedCount, Collection<Class> listeners){
T message;
try {
message = messageType.newInstance();
register(message, expectedCount, listeners);
} catch (Exception e) {
throw new RuntimeException(e);
}
return message;
}
public <T extends IMessage> void register(T message, int expectedCount, Class ...listeners){
try {
messages.add(new MessageContext(expectedCount, message, listeners));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public <T extends IMessage> void register(T message, int expectedCount, Collection<Class> listeners){
try {
messages.add(new MessageContext(expectedCount, message, listeners));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void waitForMessages(int timeoutInMs){
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < timeoutInMs && messages.size() > 0){
// check each created message once
for(MessageContext messageCtx : messages){
boolean handledCompletely = true;
for(Class listener : messageCtx.getListeners()){
handledCompletely &= messageCtx.getMessage().getTimesHandled(listener) == messageCtx.getExpectedCount();
}
// remove the ones that were handled as expected
if(handledCompletely){
logSuccess(messageCtx);
messages.remove(messageCtx);
}
}
pause(100);
}
if(messages.size() > 0){
logFailingMessages(messages);
throw new RuntimeException("Message were not fully processed in given time");
}
}
private void pause(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void logSuccess(MessageContext mCtx){
System.out.println("Message " + mCtx.getMessage() + " was successfully handled " + mCtx.getExpectedCount() + " times by " + mCtx.printListeners());
}
private void logFailingMessages(StrongConcurrentSet<MessageContext> failing){
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Failing messages:\n");
for(MessageContext failingMessage : failing)
errorMessage.append(failingMessage);
System.out.println(errorMessage.toString());
}
private class MessageContext{
private long expectedCount;
private IMessage message;
private Class[] listeners;
private MessageContext(long expectedCount, IMessage message, Class[] listeners) {
this.expectedCount = expectedCount;
this.message = message;
this.listeners = listeners;
}
private MessageContext(long expectedCount, IMessage message, Collection<Class> listeners) {
this.expectedCount = expectedCount;
this.message = message;
this.listeners = listeners.toArray(new Class[]{});
}
private long getExpectedCount() {
return expectedCount;
}
private IMessage getMessage() {
return message;
}
private Class[] getListeners() {
return listeners;
}
private String printListeners(){
StringBuilder listenersAsString = new StringBuilder();
for(Class listener : listeners){
listenersAsString.append(listener.getName());
listenersAsString.append(",");
}
return listenersAsString.toString();
}
@Override
public String toString() {
// TODO: actual count of listeners
return message.getClass().getSimpleName() + "{" +
"expectedCount=" + expectedCount +
", listeners=" + printListeners() +
'}';
}
}
}

View File

@ -0,0 +1,112 @@
package net.engio.mbassy.listeners;
import java.util.*;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 6/26/13
* Time: 11:48 AM
* To change this template use File | Settings | File Templates.
*/
public class Listeners {
private static final List<Class> Synchronous = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.DefaultListener.class,
IMessageListener.DefaultListener.class,
StandardMessageListener.DefaultListener.class,
MultipartMessageListener.DefaultListener.class,
ICountableListener.DefaultListener.class,
IMultipartMessageListener.DefaultListener.class}));
private static final List<Class> Asynchronous = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.AsyncListener.class,
IMessageListener.AsyncListener.class,
StandardMessageListener.AsyncListener.class,
MultipartMessageListener.AsyncListener.class,
ICountableListener.AsyncListener.class,
IMultipartMessageListener.AsyncListener.class}));
private static final List<Class> SubtypeRejecting = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.NoSubtypesListener.class,
IMessageListener.NoSubtypesListener.class,
StandardMessageListener.NoSubtypesListener.class,
MultipartMessageListener.NoSubtypesListener.class,
ICountableListener.NoSubtypesListener.class,
IMultipartMessageListener.NoSubtypesListener.class}));
private static final List<Class> NoHandlers = Collections.unmodifiableList(Arrays.asList(new Class[]{
MessagesListener.DisabledListener.class,
IMessageListener.DisabledListener.class,
StandardMessageListener.DisabledListener.class,
MultipartMessageListener.DisabledListener.class,
ICountableListener.DisabledListener.class,
IMultipartMessageListener.DisabledListener.class,
Object.class,String.class}));
private static final List<Class> HandlesIMessage = Collections.unmodifiableList(Arrays.asList(new Class[]{
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
IMessageListener.NoSubtypesListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.AsyncListener.class,
IMultipartMessageListener.NoSubtypesListener.class,
MessagesListener.DefaultListener.class,
MessagesListener.AsyncListener.class,
MessagesListener.NoSubtypesListener.class,
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.NoSubtypesListener.class,
MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.AsyncListener.class,
MultipartMessageListener.NoSubtypesListener.class}));
private static final List<Class> HandlesStandardessage = Collections.unmodifiableList(Arrays.asList(new Class[]{
IMessageListener.DefaultListener.class,
IMessageListener.AsyncListener.class,
ICountableListener.DefaultListener.class,
ICountableListener.AsyncListener.class,
StandardMessageListener.DefaultListener.class,
StandardMessageListener.AsyncListener.class,
StandardMessageListener.NoSubtypesListener.class}));
public static Collection<Class> synchronous(){
return Synchronous;
}
public static Collection<Class> asynchronous(){
return Asynchronous;
}
public static Collection<Class> subtypeRejecting(){
return SubtypeRejecting;
}
public static Collection<Class> noHandlers(){
return NoHandlers;
}
public static Collection<Class> handlesIMessage(){
return HandlesIMessage;
}
public static Collection<Class> handlesStandardMessage(){
return HandlesStandardessage;
}
public static Collection<Class> join(Collection<Class>...listenerSets){
Set<Class> join = new HashSet<Class>();
for(Collection<Class> listeners : listenerSets)
join.addAll(listeners);
for(Collection<Class> listeners : listenerSets)
join.retainAll(listeners);
return join;
}
}

View File

@ -24,8 +24,8 @@ public enum MessageTypes implements IMessage{
@Override
public void reset() {
lock.writeLock().lock();
try {
lock.writeLock().lock();
handledByListener.clear();
}finally {
lock.writeLock().unlock();
@ -34,8 +34,8 @@ public enum MessageTypes implements IMessage{
@Override
public void handled(Class listener) {
lock.writeLock().lock();
try {
lock.writeLock().lock();
Integer count = handledByListener.get(listener);
if(count == null){
handledByListener.put(listener, 1);
@ -50,8 +50,8 @@ public enum MessageTypes implements IMessage{
@Override
public int getTimesHandled(Class listener) {
lock.readLock().lock();
try {
lock.readLock().lock();
return handledByListener.containsKey(listener)
? handledByListener.get(listener)
: 0;