Subtype only filter, small refactorings

This commit is contained in:
Benjamin Diedrichsen 2014-03-28 10:11:21 +01:00
parent d84b3877fa
commit 54f7bd72b9
14 changed files with 153 additions and 75 deletions

View File

@ -211,9 +211,6 @@
<header>mbassador, ${project.version}</header> <header>mbassador, ${project.version}</header>
<footer>mbassador, ${project.version}</footer> <footer>mbassador, ${project.version}</footer>
<doctitle>mbassador, ${project.version}</doctitle> <doctitle>mbassador, ${project.version}</doctitle>
<links>
<link></link>
</links>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>

22
src/docs/TODO.md Normal file
View File

@ -0,0 +1,22 @@
#Tests
Asyncbus.shutdown() -> no test coverage
EnvelopedMessageDispatcher -> not tested at all
#Refactorings
#Improvements
Prio 1: Validation of handlers
ERROR:Handler with mismatching parameter types
ERROR:Interfaces + rejectSubtypes
WARN:@Synchronized only for some handlers of a given listener
Prio 2: Lifecycle Callbacks = Implement in MessagePublication (BeforeStart,AfterCompletion)
#Documentation
Add code examples Javadoc of main classes
Describe 1-Thread FIFO scheme with async dispatch
Explain how MBassador can be extended easily using delegation
Refer to Spring integration component
How to make sender part of the message publication
How to add global filtering by means of delegation

View File

@ -1,5 +1,7 @@
package net.engio.mbassy; package net.engio.mbassy;
import net.engio.mbassy.bus.MessagePublication;
import java.lang.reflect.Method; import java.lang.reflect.Method;
/** /**
@ -44,6 +46,17 @@ public class PublicationError{
this.publishedObject = publishedObject; this.publishedObject = publishedObject;
} }
public PublicationError(final Throwable cause,
final String message,
final MessagePublication publication) {
this.cause = cause;
this.message = message;
this.publishedObject = publication != null ? publication.getMessage() : null;
}
/** /**
* Default constructor. * Default constructor.
*/ */

View File

@ -3,7 +3,6 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError; import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.config.ISyncBusConfiguration; import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.publication.IPublicationCommand;
import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager; import net.engio.mbassy.subscription.SubscriptionManager;
@ -17,9 +16,8 @@ import java.util.List;
* The base class for all message bus implementations. * The base class for all message bus implementations.
* *
* @param <T> * @param <T>
* @param <P>
*/ */
public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> implements ISyncMessageBus<T, P>{ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>{
// this handler will receive all errors that occur during message dispatch or message handling // this handler will receive all errors that occur during message dispatch or message handling
@ -32,7 +30,7 @@ public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> i
private final BusRuntime runtime; private final BusRuntime runtime;
public AbstractSyncMessageBus(ISyncBusConfiguration configuration) { public AbstractPubSubSupport(ISyncBusConfiguration configuration) {
this.runtime = new BusRuntime(this); this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers()); this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.subscriptionManager = configuration.getSubscriptionManagerProvider() this.subscriptionManager = configuration.getSubscriptionManagerProvider()
@ -45,7 +43,7 @@ public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> i
return publicationFactory; return publicationFactory;
} }
@Override
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() { public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers); return Collections.unmodifiableCollection(errorHandlers);
} }

View File

@ -17,7 +17,8 @@ import java.util.concurrent.TimeUnit;
* @param <T> * @param <T>
* @param <P> * @param <P>
*/ */
public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand> extends AbstractSyncMessageBus<T, P> implements IMessageBus<T, P> { public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends AbstractPubSubSupport<T> implements IMessageBus<T, P> {
// executor for asynchronous message handlers // executor for asynchronous message handlers
private final ExecutorService executor; private final ExecutorService executor;
@ -47,13 +48,15 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
Thread dispatcher = configuration.getThreadFactoryForAsynchronousMessageDispatch().newThread(new Runnable() { Thread dispatcher = configuration.getThreadFactoryForAsynchronousMessageDispatch().newThread(new Runnable() {
public void run() { public void run() {
while (true) { while (true) {
MessagePublication publication = null;
try { try {
pendingMessages.take().execute(); publication = pendingMessages.take();
publication.execute();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; return;
} catch(Throwable t){ } catch(Throwable t){
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", null, null, null)); handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",publication));
} }
} }
} }
@ -64,26 +67,26 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
} }
// this method enqueues a message delivery request // this method queues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) { protected MessagePublication addAsynchronousPublication(MessagePublication publication) {
try { try {
pendingMessages.put(request); pendingMessages.put(publication);
return request.markScheduled(); return publication.markScheduled();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO: publication error handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
return request; return publication;
} }
} }
// this method queues a message delivery request // this method queues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) { protected MessagePublication addAsynchronousPublication(MessagePublication publication, long timeout, TimeUnit unit) {
try { try {
return pendingMessages.offer(request, timeout, unit) return pendingMessages.offer(publication, timeout, unit)
? request.markScheduled() ? publication.markScheduled()
: request; : publication;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO: publication error handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
return request; return publication;
} }
} }

View File

@ -57,7 +57,8 @@ import java.util.concurrent.Executor;
* @Author bennidi * @Author bennidi
* Date: 2/8/12 * Date: 2/8/12
*/ */
public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P> { public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P>, ISyncMessageBus<T,P> {
/** /**
* {@inheritDoc} * {@inheritDoc}

View File

@ -16,12 +16,12 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
@Override @Override
public MessagePublication publishAsync(T message) { public MessagePublication publishAsync(T message) {
return addAsynchronousDeliveryRequest(createMessagePublication(message)); return addAsynchronousPublication(createMessagePublication(message));
} }
@Override @Override
public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) { public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit); return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
} }

View File

@ -23,7 +23,7 @@ public class MessagePublication {
private final Object message; private final Object message;
// message publications can be referenced by multiple threads to query publication progress // message publications can be referenced by multiple threads to query publication progress
private volatile State state = State.Initial; private volatile State state = State.Initial;
private volatile boolean delivered = false; private volatile boolean delivered = false; // TODO: maybe replace with return value of subscription and dispatchers?
private final BusRuntime runtime; private final BusRuntime runtime;
protected MessagePublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message, State initialState) { protected MessagePublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message, State initialState) {
@ -86,6 +86,10 @@ public class MessagePublication {
return FilteredMessage.class.isAssignableFrom(message.getClass()); return FilteredMessage.class.isAssignableFrom(message.getClass());
} }
public Object getMessage() {
return message;
}
private enum State { private enum State {
Initial, Scheduled, Running, Finished, Error Initial, Scheduled, Running, Finished, Error
} }

View File

@ -5,25 +5,18 @@ import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.publication.IPublicationCommand; import net.engio.mbassy.bus.publication.IPublicationCommand;
/** /**
* Created with IntelliJ IDEA. * A message bus implementation that offers only synchronous message publication. Using this bus
* User: benjamin * will not create any new threads.
* Date: 4/3/13 *
* Time: 9:02 AM
* To change this template use File | Settings | File Templates.
*/ */
public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.SyncPostCommand>{ public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements ISyncMessageBus<T, SyncMessageBus.SyncPostCommand>{
public SyncMessageBus(ISyncBusConfiguration configuration) { public SyncMessageBus(ISyncBusConfiguration configuration) {
super(configuration); super(configuration);
} }
/** @Override
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message.
*
* @param message
*/
public void publish(T message) { public void publish(T message) {
try { try {
MessagePublication publication = createMessagePublication(message); MessagePublication publication = createMessagePublication(message);
@ -34,7 +27,6 @@ public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.
.setCause(e) .setCause(e)
.setPublishedObject(message)); .setPublishedObject(message));
} }
} }
@Override @Override
@ -44,7 +36,6 @@ public class SyncMessageBus<T> extends AbstractSyncMessageBus<T, SyncMessageBus.
public class SyncPostCommand implements IPublicationCommand { public class SyncPostCommand implements IPublicationCommand {
private T message; private T message;
public SyncPostCommand(T message) { public SyncPostCommand(T message) {

View File

@ -1,31 +1,23 @@
package net.engio.mbassy.listener; package net.engio.mbassy.listener;
/** /**
* Some sample filters that are not particularly useful in production environment * A set of standard filters for common use cases.
* but illustrate how filters are meant to be used.
* *
* @author bennidi * @author bennidi
* Date: 12/12/12 * Date: 12/12/12
*/ */
public class Filters { public class Filters {
public static final class AllowAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandler metadata) {
return true;
}
}
public static final class RejectAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandler metadata) {
return false;
}
}
/**
* This filter will only accept messages of the exact same type
* as specified for the handler. Subclasses (this includes interface implementations)
* will be rejected.
*
* NOTE: The same functionality (with better performance) is achieved using {@code rejectSubtypes = true}
* in the @Handler annotation
*/
public static final class RejectSubtypes implements IMessageFilter { public static final class RejectSubtypes implements IMessageFilter {
@Override @Override
@ -38,4 +30,24 @@ public class Filters {
return false; return false;
} }
} }
/**
* This filter will only accept messages that are real subtypes
* of the specified message types handled by the message handler.
* Example: If the handler handles Object.class the filter accepts
* all objects except any direct instance of Object.class {@code new Object()}
*/
public static final class SubtypesOnly implements IMessageFilter{
@Override
public boolean accepts(Object message, MessageHandler metadata) {
for(Class acceptedClasses : metadata.getHandledMessages()){
if(acceptedClasses.isAssignableFrom(message.getClass())
&& ! acceptedClasses.equals(message.getClass()))
return true;
}
return false;
}
}
} }

View File

@ -1,15 +1,11 @@
package net.engio.mbassy; package net.engio.mbassy;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.FilteredMessage; import net.engio.mbassy.common.*;
import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.listener.*;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.messages.SubTestMessage; import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage; import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.common.ListenerFactory;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
@ -88,7 +84,7 @@ public class FilterTest extends MessageBusTest {
} }
// will cause republication of a FilteredEvent // will cause republication of a FilteredEvent
@Handler(filters = {@Filter(Filters.RejectAll.class)}) @Handler(filters = {@Filter(RejectAll.class)})
public void handleNone(Object any){ public void handleNone(Object any){
FilteredEventCounter.incrementAndGet(); FilteredEventCounter.incrementAndGet();
} }
@ -98,7 +94,34 @@ public class FilterTest extends MessageBusTest {
public void handleDead(DeadMessage dead){ public void handleDead(DeadMessage dead){
DeadEventCounter.incrementAndGet(); DeadEventCounter.incrementAndGet();
} }
}
@Test
public void testSubtypesOnly(){
MBassador bus = getBus(BusConfiguration.Default());
ListenerFactory listeners = new ListenerFactory()
.create(100, TestMessageHandler.class);
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestMessage supertype = new TestMessage();
TestMessage subtype = new SubTestMessage();
bus.publish(supertype);
bus.publish(subtype);
assertEquals(100, subtype.counter.get());
assertEquals(0, supertype.counter.get());
}
public static class TestMessageHandler{
@Handler(filters = @Filter(Filters.SubtypesOnly.class))
public void handle(TestMessage message){
message.counter.incrementAndGet();
}
} }
@ -113,4 +136,12 @@ public class FilterTest extends MessageBusTest {
} }
} }
public static final class RejectAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandler metadata) {
return false;
}
}
} }

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.common; package net.engio.mbassy.common;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.PubSubSupport; import net.engio.mbassy.bus.PubSubSupport;
import net.engio.mbassy.subscription.SubscriptionManager; import net.engio.mbassy.subscription.SubscriptionManager;
@ -100,4 +101,8 @@ public class TestUtil {
} }
public static void setup(MBassador bus, ListenerFactory listeners, int numberOfThreads) {
setup(bus, listeners.getAll(), numberOfThreads);
}
} }

View File

@ -6,11 +6,9 @@ import net.engio.mbassy.listener.References;
import net.engio.mbassy.messages.AbstractMessage; import net.engio.mbassy.messages.AbstractMessage;
/** /**
* Created with IntelliJ IDEA. * Some handlers and message types to test correct functioning of overloaded
* User: benjamin * message handlers
* Date: 7/11/13 *
* Time: 10:11 AM
* To change this template use File | Settings | File Templates.
*/ */
public class Overloading { public class Overloading {

View File

@ -11,16 +11,19 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Date: 5/24/13 * Date: 5/24/13
*/ */
public enum MessageTypes implements IMessage{ public enum MessageTypes implements IMessage{
Simple,Persistent,Multipart;
Simple,
Persistent,
Multipart;
private Map<Class, Integer> handledByListener = new HashMap<Class, Integer>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void resetAll(){ public static void resetAll(){
for(MessageTypes m : values()) for(MessageTypes m : values())
m.reset(); m.reset();
} }
private Map<Class, Integer> handledByListener = new HashMap<Class, Integer>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override @Override
public void reset() { public void reset() {