Added SubsciptionManager, fixed #30 and #31

This commit is contained in:
benni 2013-05-12 14:37:08 +02:00
commit 3b215115fb
11 changed files with 261 additions and 82 deletions

View File

@ -23,7 +23,5 @@ public class MessageBusException extends Exception{
super(cause);
}
public MessageBusException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -5,7 +5,7 @@ import java.util.List;
import java.util.concurrent.*;
/**
* The base class for all message bus implementations.
* The base class for all async message bus implementations.
*
* @param <T>
* @param <P>

View File

@ -2,13 +2,16 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.*;
/**
* The base class for all message bus implementations.
@ -20,7 +23,7 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final MessagePublication.Factory publicationFactory;
@ -54,10 +57,23 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
public final void addErrorHandler(IPublicationErrorHandler handler) {
errorHandlers.add(handler);
synchronized (this){
errorHandlers.add(handler);
}
}
protected MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
} else {
return getPublicationFactory().createPublication(this, subscriptions, message);
}
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {

View File

@ -1,10 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@ -23,17 +20,6 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit);
}
private MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
} else {
return getPublicationFactory().createPublication(this, subscriptions, message);
}
}
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)

View File

@ -21,7 +21,7 @@ public class MessagePublication {
public static class Factory {
public MessagePublication createPublication(IMessageBus owningBus, Collection<Subscription> subscriptions, Object message) {
public MessagePublication createPublication(ISyncMessageBus owningBus, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(owningBus, subscriptions, message, State.Initial);
}
@ -35,9 +35,9 @@ public class MessagePublication {
private boolean delivered = false;
private IMessageBus bus;
private ISyncMessageBus bus;
public MessagePublication(IMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
public MessagePublication(ISyncMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
this.bus = bus;
this.subscriptions = subscriptions;
this.message = message;

View File

@ -0,0 +1,57 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.PublicationError;
/**
* Created with IntelliJ IDEA.
* User: benjamin
* Date: 4/3/13
* Time: 9:02 AM
* To change this template use File | Settings | File Templates.
*/
public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.SyncPostCommand>{
public SyncMessageBus(SyncBusConfiguration configuration) {
super(configuration);
}
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message.
*
* @param message
*/
public void publish(T message) {
try {
MessagePublication publication = createMessagePublication(message);
publication.execute();
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(message));
}
}
@Override
public SyncPostCommand post(T message) {
return new SyncPostCommand(message);
}
public class SyncPostCommand implements ISyncMessageBus.ISyncPostCommand{
private T message;
public SyncPostCommand(T message) {
this.message = message;
}
@Override
public void now() {
publish(message);
}
}
}

View File

@ -14,7 +14,9 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
StrongConcurrentSetTest.class,
WeakConcurrentSetTest.class,
MessagePublicationTest.class,
AsynchronousMessageBusTest.class,
SyncBusTest.MBassadorTest.class,
SyncBusTest.SyncMessageBusTest.class,
FilterTest.class,
MetadataReaderTest.class,
ListenerSubscriptionTest.class,

View File

@ -20,7 +20,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
* @author bennidi
* Date: 2/8/12
*/
public class MessagePublicationTest extends MessageBusTest {
public class AsynchronousMessageBusTest extends MessageBusTest {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
@ -61,59 +61,6 @@ public class MessagePublicationTest extends MessageBusTest {
}
@Test
public void testSynchronousMessagePublication() throws Exception {
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
.create(10000, EventingTestBean3.class)
.create(10000, Object.class)
.create(10000, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.publish(message);
bus.publish(subMessage);
pause(processingTimeInMS);
assertEquals(30000, message.counter.get());
assertEquals(70000, subMessage.counter.get());
}
@Test
public void testStrongListenerSubscription() throws Exception {
MBassador bus = getBus(new BusConfiguration());
for(int i = 0; i< 10000; i++){
bus.subscribe(new EventingTestBean2());
}
runGC();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.publish(message);
bus.publish(subMessage);
pause(processingTimeInMS);
assertEquals(10000, message.counter.get());
assertEquals(20000, subMessage.counter.get());
}
@Test
public void testConcurrentMixedMessagePublication() throws Exception {

View File

@ -0,0 +1,170 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.*;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.subscription.SubscriptionContext;
import org.junit.Test;
import java.util.List;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*
* @author bennidi
* Date: 2/8/12
*/
public abstract class SyncBusTest extends MessageBusTest {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
// evaluation is run
private int processingTimeInMS = 4000;
@Test
public void testSynchronousMessagePublication() throws Exception {
ISyncMessageBus bus = getSyncMessageBus();
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, MessageListener1.class)
.create(10000, MessageListener2.class)
.create(10000, MessageListener3.class)
.create(10000, Object.class)
.create(10000, NonListeningBean.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.post(message).now();
bus.post(subMessage).now();
pause(processingTimeInMS);
assertEquals(30000, message.counter.get());
assertEquals(70000, subMessage.counter.get());
}
@Test
public void testStrongListenerSubscription() throws Exception {
ISyncMessageBus bus = getSyncMessageBus();
for(int i = 0; i< 10000; i++){
bus.subscribe(new MessageListener2());
}
runGC();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.post(message).now();
bus.post(subMessage).now();
pause(processingTimeInMS);
assertEquals(10000, message.counter.get());
assertEquals(20000, subMessage.counter.get());
}
protected abstract ISyncMessageBus getSyncMessageBus();
public static class MessageListener1 {
// every event of type TestEvent or any subtype will be delivered
// to this listener
@Handler
public void handleTestEvent(TestMessage message) {
message.counter.incrementAndGet();
}
// this handler will be invoked asynchronously
@Handler(priority = 0, invocation = HandleSubTestEventInvocation.class)
public void handleSubTestEvent(SubTestMessage message) {
message.counter.incrementAndGet();
}
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Handler(
priority = 10,
delivery = Invoke.Synchronously,
filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)})
public void handleFiltered(SubTestMessage message) {
message.counter.incrementAndGet();
}
}
public static class HandleSubTestEventInvocation extends HandlerInvocation<MessageListener1, SubTestMessage> {
public HandleSubTestEventInvocation(SubscriptionContext context) {
super(context);
}
@Override
public void invoke(MessageListener1 listener, SubTestMessage message) {
listener.handleSubTestEvent(message);
}
}
@Listener(references = References.Strong)
public static class MessageListener2 extends net.engio.mbassy.listeners.EventingTestBean {
// redefine the configuration for this handler
@Handler(delivery = Invoke.Synchronously)
public void handleSubTestEvent(SubTestMessage message) {
super.handleSubTestEvent(message);
}
}
@Listener(references = References.Strong)
public static class MessageListener3 extends net.engio.mbassy.listeners.EventingTestBean2 {
// this handler will be invoked asynchronously
@Handler(priority = 0, delivery = Invoke.Synchronously)
public void handleSubTestEventAgain(SubTestMessage message) {
message.counter.incrementAndGet();
}
}
public static class MBassadorTest extends SyncBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new MBassador(BusConfiguration.Default());
}
}
public static class SyncMessageBusTest extends SyncBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new SyncMessageBus(new SyncBusConfiguration());
}
}
}

View File

@ -4,6 +4,7 @@ import junit.framework.Assert;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.bus.MBassador;
/**
@ -13,9 +14,9 @@ import net.engio.mbassy.bus.MBassador;
* @author bennidi
* Date: 3/2/13
*/
public class MessageBusTest extends UnitTest {
public class MessageBusTest<Bus extends ISyncMessageBus> extends UnitTest {
private static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override
public void handleError(PublicationError error) {
Assert.fail();
@ -27,4 +28,5 @@ public class MessageBusTest extends UnitTest {
bus.addErrorHandler(TestFailingHandler);
return bus;
}
}

View File

@ -1,6 +1,7 @@
package net.engio.mbassy.common;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.ISyncMessageBus;
import java.util.List;
@ -13,7 +14,7 @@ import java.util.List;
public class TestUtil {
public static void setup(final IMessageBus bus, final List<Object> listeners, int numberOfThreads) {
public static void setup(final ISyncMessageBus bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;
if(listeners.size() >= numberOfThreads){