From 54f7bd72b90ff7608c97b8ba4708ca95bc4de573 Mon Sep 17 00:00:00 2001 From: Benjamin Diedrichsen Date: Fri, 28 Mar 2014 10:11:21 +0100 Subject: [PATCH] Subtype only filter, small refactorings --- pom.xml | 3 -- src/docs/TODO.md | 22 +++++++++ .../net/engio/mbassy/PublicationError.java | 13 +++++ ...ageBus.java => AbstractPubSubSupport.java} | 8 ++-- .../bus/AbstractSyncAsyncMessageBus.java | 33 +++++++------ .../net/engio/mbassy/bus/IMessageBus.java | 3 +- .../java/net/engio/mbassy/bus/MBassador.java | 4 +- .../engio/mbassy/bus/MessagePublication.java | 6 ++- .../net/engio/mbassy/bus/SyncMessageBus.java | 19 ++------ .../net/engio/mbassy/listener/Filters.java | 46 +++++++++++------- .../java/net/engio/mbassy/FilterTest.java | 47 +++++++++++++++---- .../net/engio/mbassy/common/TestUtil.java | 5 ++ .../engio/mbassy/listeners/Overloading.java | 8 ++-- .../engio/mbassy/messages/MessageTypes.java | 11 +++-- 14 files changed, 153 insertions(+), 75 deletions(-) create mode 100644 src/docs/TODO.md rename src/main/java/net/engio/mbassy/bus/{AbstractSyncMessageBus.java => AbstractPubSubSupport.java} (91%) diff --git a/pom.xml b/pom.xml index 9c33db9..2707fb7 100644 --- a/pom.xml +++ b/pom.xml @@ -211,9 +211,6 @@
mbassador, ${project.version}
mbassador, ${project.version} - - - diff --git a/src/docs/TODO.md b/src/docs/TODO.md new file mode 100644 index 0000000..7270c7e --- /dev/null +++ b/src/docs/TODO.md @@ -0,0 +1,22 @@ +#Tests +Asyncbus.shutdown() -> no test coverage +EnvelopedMessageDispatcher -> not tested at all + +#Refactorings + + +#Improvements +Prio 1: Validation of handlers + ERROR:Handler with mismatching parameter types + ERROR:Interfaces + rejectSubtypes + WARN:@Synchronized only for some handlers of a given listener +Prio 2: Lifecycle Callbacks = Implement in MessagePublication (BeforeStart,AfterCompletion) + + +#Documentation +Add code examples Javadoc of main classes +Describe 1-Thread FIFO scheme with async dispatch +Explain how MBassador can be extended easily using delegation +Refer to Spring integration component +How to make sender part of the message publication +How to add global filtering by means of delegation diff --git a/src/main/java/net/engio/mbassy/PublicationError.java b/src/main/java/net/engio/mbassy/PublicationError.java index 1a7b6c2..3af4b2f 100644 --- a/src/main/java/net/engio/mbassy/PublicationError.java +++ b/src/main/java/net/engio/mbassy/PublicationError.java @@ -1,5 +1,7 @@ package net.engio.mbassy; +import net.engio.mbassy.bus.MessagePublication; + import java.lang.reflect.Method; /** @@ -44,6 +46,17 @@ public class PublicationError{ this.publishedObject = publishedObject; } + public PublicationError(final Throwable cause, + final String message, + final MessagePublication publication) { + this.cause = cause; + this.message = message; + this.publishedObject = publication != null ? publication.getMessage() : null; + } + + + + /** * Default constructor. */ diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java similarity index 91% rename from src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java rename to src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index 164affc..e74be6f 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -3,7 +3,6 @@ package net.engio.mbassy.bus; import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.PublicationError; import net.engio.mbassy.bus.config.ISyncBusConfiguration; -import net.engio.mbassy.bus.publication.IPublicationCommand; import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.SubscriptionManager; @@ -17,9 +16,8 @@ import java.util.List; * The base class for all message bus implementations. * * @param - * @param

*/ -public abstract class AbstractSyncMessageBus implements ISyncMessageBus{ +public abstract class AbstractPubSubSupport implements PubSubSupport{ // this handler will receive all errors that occur during message dispatch or message handling @@ -32,7 +30,7 @@ public abstract class AbstractSyncMessageBus i private final BusRuntime runtime; - public AbstractSyncMessageBus(ISyncBusConfiguration configuration) { + public AbstractPubSubSupport(ISyncBusConfiguration configuration) { this.runtime = new BusRuntime(this); this.runtime.add("error.handlers", getRegisteredErrorHandlers()); this.subscriptionManager = configuration.getSubscriptionManagerProvider() @@ -45,7 +43,7 @@ public abstract class AbstractSyncMessageBus i return publicationFactory; } - @Override + public Collection getRegisteredErrorHandlers() { return Collections.unmodifiableCollection(errorHandlers); } diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index d4187d1..d6b7e27 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -17,7 +17,8 @@ import java.util.concurrent.TimeUnit; * @param * @param

*/ -public abstract class AbstractSyncAsyncMessageBus extends AbstractSyncMessageBus implements IMessageBus { +public abstract class AbstractSyncAsyncMessageBus + extends AbstractPubSubSupport implements IMessageBus { // executor for asynchronous message handlers private final ExecutorService executor; @@ -47,13 +48,15 @@ public abstract class AbstractSyncAsyncMessageBus extends PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport { +public interface IMessageBus + extends PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport, ISyncMessageBus { /** * {@inheritDoc} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index e99f937..ee4206f 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -16,12 +16,12 @@ public class MBassador extends AbstractSyncAsyncMessageBus subscriptions, Object message, State initialState) { @@ -86,6 +86,10 @@ public class MessagePublication { return FilteredMessage.class.isAssignableFrom(message.getClass()); } + public Object getMessage() { + return message; + } + private enum State { Initial, Scheduled, Running, Finished, Error } diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index 72c45f4..9dfb911 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -5,25 +5,18 @@ import net.engio.mbassy.bus.config.ISyncBusConfiguration; import net.engio.mbassy.bus.publication.IPublicationCommand; /** - * Created with IntelliJ IDEA. - * User: benjamin - * Date: 4/3/13 - * Time: 9:02 AM - * To change this template use File | Settings | File Templates. + * A message bus implementation that offers only synchronous message publication. Using this bus + * will not create any new threads. + * */ -public class SyncMessageBus extends AbstractSyncMessageBus{ +public class SyncMessageBus extends AbstractPubSubSupport implements ISyncMessageBus{ public SyncMessageBus(ISyncBusConfiguration configuration) { super(configuration); } - /** - * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) - * The call blocks until every messageHandler has processed the message. - * - * @param message - */ + @Override public void publish(T message) { try { MessagePublication publication = createMessagePublication(message); @@ -34,7 +27,6 @@ public class SyncMessageBus extends AbstractSyncMessageBus extends AbstractSyncMessageBus handledByListener = new HashMap(); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static void resetAll(){ for(MessageTypes m : values()) m.reset(); } - private Map handledByListener = new HashMap(); - private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - @Override public void reset() {