From 2a51bb4378a8518c6191ae9440f5d4b4e6fbcc99 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 16 Nov 2014 00:55:56 +0100 Subject: [PATCH 1/3] Changed MessagePublication to interface (IMessagePublication) to permit extending the MessagePublication and Factories --- .../mbassy/bus/AbstractPubSubSupport.java | 4 +- .../bus/AbstractSyncAsyncMessageBus.java | 8 ++-- .../engio/mbassy/bus/IMessagePublication.java | 40 +++++++++++++++++++ .../java/net/engio/mbassy/bus/MBassador.java | 6 +-- .../engio/mbassy/bus/MessagePublication.java | 4 +- .../net/engio/mbassy/bus/SyncMessageBus.java | 2 +- .../net/engio/mbassy/bus/config/Feature.java | 9 +++-- .../mbassy/bus/error/PublicationError.java | 4 +- .../ISyncAsyncPublicationCommand.java | 6 +-- .../bus/publication/SyncAsyncPostCommand.java | 6 +-- .../dispatch/EnvelopedMessageDispatcher.java | 4 +- .../dispatch/FilteredMessageDispatcher.java | 4 +- .../mbassy/dispatch/IMessageDispatcher.java | 4 +- .../mbassy/dispatch/MessageDispatcher.java | 4 +- .../mbassy/subscription/Subscription.java | 4 +- .../engio/mbassy/SynchronizedHandlerTest.java | 4 +- .../engio/mbassy/common/MessageBusTest.java | 12 +++--- 17 files changed, 83 insertions(+), 42 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/bus/IMessagePublication.java diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index 8e99787..a18febc 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -38,7 +38,7 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { // configure the pub sub feature Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class); this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider() - .createManager(pubSubFeature.getMetadataReader(), + .createManager(pubSubFeature.getMetadataReader(), pubSubFeature.getSubscriptionFactory(), runtime); this.publicationFactory = pubSubFeature.getPublicationFactory(); } @@ -73,7 +73,7 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { return runtime; } - protected MessagePublication createMessagePublication(T message) { + protected IMessagePublication createMessagePublication(T message) { Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) { // Dead Event diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index 69bc587..96e1d95 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -29,7 +29,7 @@ public abstract class AbstractSyncAsyncMessageBus dispatchers; // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue pendingMessages; + private final BlockingQueue pendingMessages; protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) { super(configuration); @@ -55,7 +55,7 @@ public abstract class AbstractSyncAsyncMessageBus + * A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to + * be used in multiple threads simultaneously . + * + * @author bennidi + * Date: 11/16/12 + */ +public interface IMessagePublication { + + public boolean add(Subscription subscription); + + /* + TODO: document state transitions + */ + public void execute(); + + public boolean isFinished(); + + public boolean isRunning(); + + public boolean isScheduled(); + + public void markDelivered(); + + public IMessagePublication markScheduled(); + + public boolean isDeadEvent(); + + public boolean isFilteredEvent(); + + public Object getMessage(); +} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index 73951da..ec249f4 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -24,11 +24,11 @@ public class MBassador extends AbstractSyncAsyncMessageBus extends AbstractSyncAsyncMessageBus subscriptions; private final Object message; @@ -40,7 +40,7 @@ public class MessagePublication { /* TODO: document state transitions */ - protected void execute() { + public void execute() { state = State.Running; for (Subscription sub : subscriptions) { sub.publish(this, message); diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index 8d8c527..5cd4892 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -22,7 +22,7 @@ public class SyncMessageBus extends AbstractPubSubSupport implements PubSu @Override public void publish(T message) { try { - MessagePublication publication = createMessagePublication(message); + IMessagePublication publication = createMessagePublication(message); publication.execute(); } catch (Throwable e) { handlePublicationError(new PublicationError() diff --git a/src/main/java/net/engio/mbassy/bus/config/Feature.java b/src/main/java/net/engio/mbassy/bus/config/Feature.java index a091298..5d0cec5 100644 --- a/src/main/java/net/engio/mbassy/bus/config/Feature.java +++ b/src/main/java/net/engio/mbassy/bus/config/Feature.java @@ -1,5 +1,6 @@ package net.engio.mbassy.bus.config; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.subscription.ISubscriptionManagerProvider; @@ -132,12 +133,12 @@ public interface Feature { return new AsynchronousMessageDispatch() .setNumberOfMessageDispatchers(2) .setDispatcherThreadFactory(MessageDispatchThreadFactory) - .setMessageQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)); + .setMessageQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)); } private int numberOfMessageDispatchers; - private BlockingQueue pendingMessages; + private BlockingQueue pendingMessages; private ThreadFactory dispatcherThreadFactory; public int getNumberOfMessageDispatchers() { @@ -149,11 +150,11 @@ public interface Feature { return this; } - public BlockingQueue getPendingMessages() { + public BlockingQueue getPendingMessages() { return pendingMessages; } - public AsynchronousMessageDispatch setMessageQueue(BlockingQueue pendingMessages) { + public AsynchronousMessageDispatch setMessageQueue(BlockingQueue pendingMessages) { this.pendingMessages = pendingMessages; return this; } diff --git a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java index 0903f0f..dc98555 100644 --- a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java +++ b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java @@ -1,6 +1,6 @@ package net.engio.mbassy.bus.error; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import java.lang.reflect.Method; @@ -48,7 +48,7 @@ public class PublicationError{ public PublicationError(final Throwable cause, final String message, - final MessagePublication publication) { + final IMessagePublication publication) { this.cause = cause; this.message = message; this.publishedObject = publication != null ? publication.getMessage() : null; diff --git a/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java b/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java index 0a79cbf..d5ad5b3 100644 --- a/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java +++ b/src/main/java/net/engio/mbassy/bus/publication/ISyncAsyncPublicationCommand.java @@ -1,6 +1,6 @@ package net.engio.mbassy.bus.publication; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import java.util.concurrent.TimeUnit; @@ -19,7 +19,7 @@ public interface ISyncAsyncPublicationCommand extends IPublicationCommand { * * @return A message publication that can be used to access information about the state of */ - MessagePublication asynchronously(); + IMessagePublication asynchronously(); /** * Execute the message publication asynchronously. The behaviour of this method depends on the @@ -31,5 +31,5 @@ public interface ISyncAsyncPublicationCommand extends IPublicationCommand { * * @return A message publication that wraps up the publication request */ - MessagePublication asynchronously(long timeout, TimeUnit unit); + IMessagePublication asynchronously(long timeout, TimeUnit unit); } diff --git a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java b/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java index 9473bd5..87601da 100644 --- a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java +++ b/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java @@ -1,7 +1,7 @@ package net.engio.mbassy.bus.publication; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import java.util.concurrent.TimeUnit; @@ -27,12 +27,12 @@ public class SyncAsyncPostCommand implements ISyncAsyncPublicationCommand { } @Override - public MessagePublication asynchronously() { + public IMessagePublication asynchronously() { return mBassador.publishAsync(message); } @Override - public MessagePublication asynchronously(long timeout, TimeUnit unit) { + public IMessagePublication asynchronously(long timeout, TimeUnit unit) { return mBassador.publishAsync(message, timeout, unit); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java index 07a824e..f2eb79b 100644 --- a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.subscription.MessageEnvelope; /** @@ -20,7 +20,7 @@ public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher { } @Override - public void dispatch(MessagePublication publication, Object message, Iterable listeners){ + public void dispatch(IMessagePublication publication, Object message, Iterable listeners){ getDelegate().dispatch(publication, new MessageEnvelope(message), listeners); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java index 2b3b446..d46bdda 100644 --- a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.listener.IMessageFilter; /** @@ -36,7 +36,7 @@ public final class FilteredMessageDispatcher extends DelegatingMessageDispatcher @Override - public void dispatch(MessagePublication publication, Object message, Iterable listeners){ + public void dispatch(IMessagePublication publication, Object message, Iterable listeners){ if (passesFilter(message)) { getDelegate().dispatch(publication, message, listeners); } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java index 9bed84f..23fcafe 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.subscription.ISubscriptionContextAware; /** @@ -29,7 +29,7 @@ public interface IMessageDispatcher extends ISubscriptionContextAware { * @param message The message that should be delivered to the listeners * @param listeners The listeners that should receive the message */ - void dispatch(MessagePublication publication, Object message, Iterable listeners); + void dispatch(IMessagePublication publication, Object message, Iterable listeners); /** * Get the handler invocation that will be used to deliver the diff --git a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java index d853871..2aef9e9 100644 --- a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import net.engio.mbassy.subscription.SubscriptionContext; @@ -24,7 +24,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen } @Override - public void dispatch(final MessagePublication publication, final Object message, final Iterable listeners){ + public void dispatch(final IMessagePublication publication, final Object message, final Iterable listeners){ publication.markDelivered(); for (Object listener : listeners) { getInvocation().invoke(listener, message); diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index a123259..8677b8a 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -1,6 +1,6 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.dispatch.IMessageDispatcher; @@ -68,7 +68,7 @@ public class Subscription { } - public void publish(MessagePublication publication, Object message){ + public void publish(IMessagePublication publication, Object message){ if(listeners.size() > 0) dispatcher.dispatch(publication, message, listeners); } diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index 60bc667..0c3b9dd 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -1,6 +1,6 @@ package net.engio.mbassy; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; @@ -39,7 +39,7 @@ public class SynchronizedHandlerTest extends MessageBusTest { bus.subscribe(handler); } - MessagePublication publication = null; + IMessagePublication publication = null; for(int i = 0; i < numberOfMessages; i++){ publication = bus.post(new Object()).asynchronously(); } diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index 9642ad9..78e7553 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -2,7 +2,7 @@ package net.engio.mbassy.common; import junit.framework.Assert; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; @@ -37,11 +37,11 @@ public abstract class MessageBusTest extends AssertSupport { }; - private StrongConcurrentSet issuedPublications = new StrongConcurrentSet(); + private StrongConcurrentSet issuedPublications = new StrongConcurrentSet(); @Before public void setUp(){ - issuedPublications = new StrongConcurrentSet(); + issuedPublications = new StrongConcurrentSet(); for(MessageTypes mes : MessageTypes.values()) mes.reset(); } @@ -66,14 +66,14 @@ public abstract class MessageBusTest extends AssertSupport { return bus; } - protected void track(MessagePublication asynchronously) { + protected void track(IMessagePublication asynchronously) { issuedPublications.add(asynchronously); } public void waitForPublications(long timeOutInMs){ long start = System.currentTimeMillis(); while(issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){ - for(MessagePublication pub : issuedPublications){ + for(IMessagePublication pub : issuedPublications){ if(pub.isFinished()) issuedPublications.remove(pub); } @@ -82,7 +82,7 @@ public abstract class MessageBusTest extends AssertSupport { fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms"); } - public void addPublication(MessagePublication publication){ + public void addPublication(IMessagePublication publication){ issuedPublications.add(publication); } From 1bb957769ff180c687e843f7da8be4b6cc02b9e9 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 16 Nov 2014 01:11:35 +0100 Subject: [PATCH 2/3] Added eclipse classes dir and libs to .gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index dbe3c15..50c92ce 100644 --- a/.gitignore +++ b/.gitignore @@ -13,8 +13,11 @@ # root of compiled classes # target/**/* target/** +classes/ + # the local maven repository # +lib/ mvn-local-repo/**/* release.properties /.classpath From 7b3032877afb7ea1cf7bb00154b081cdda65d75c Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 16 Nov 2014 14:32:43 +0100 Subject: [PATCH 3/3] Factory returns interface instead of object to allow for extending message publication --- src/main/java/net/engio/mbassy/bus/MessagePublication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java index 481b4e8..2bc1a0e 100644 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/MessagePublication.java @@ -99,7 +99,7 @@ public class MessagePublication implements IMessagePublication { public static class Factory { - public MessagePublication createPublication(BusRuntime runtime, Collection subscriptions, Object message) { + public IMessagePublication createPublication(BusRuntime runtime, Collection subscriptions, Object message) { return new MessagePublication(runtime, subscriptions, message, State.Initial); }