Merge pull request #91 from dorkbox/master

Extend the message publication factory to return an interface type instead of class type to allow for easier extension of message publication behaviour
This commit is contained in:
Benjamin Diedrichsen 2014-12-18 12:40:20 +01:00
commit f509bfe57f
18 changed files with 87 additions and 43 deletions

3
.gitignore vendored
View File

@ -13,8 +13,11 @@
# root of compiled classes #
target/**/*
target/**
classes/
# the local maven repository #
lib/
mvn-local-repo/**/*
release.properties
/.classpath

View File

@ -38,7 +38,7 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
// configure the pub sub feature
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider()
.createManager(pubSubFeature.getMetadataReader(),
.createManager(pubSubFeature.getMetadataReader(),
pubSubFeature.getSubscriptionFactory(), runtime);
this.publicationFactory = pubSubFeature.getPublicationFactory();
}
@ -73,7 +73,7 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
return runtime;
}
protected MessagePublication createMessagePublication(T message) {
protected IMessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
// Dead Event

View File

@ -29,7 +29,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
private final List<Thread> dispatchers;
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication> pendingMessages;
private final BlockingQueue<IMessagePublication> pendingMessages;
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
super(configuration);
@ -55,7 +55,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
public void run() {
while (true) {
MessagePublication publication = null;
IMessagePublication publication = null;
try {
publication = pendingMessages.take();
publication.execute();
@ -76,7 +76,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
// this method queues a message delivery request
protected MessagePublication addAsynchronousPublication(MessagePublication publication) {
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) {
try {
pendingMessages.put(publication);
return publication.markScheduled();
@ -87,7 +87,7 @@ public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublica
}
// this method queues a message delivery request
protected MessagePublication addAsynchronousPublication(MessagePublication publication, long timeout, TimeUnit unit) {
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(publication, timeout, unit)
? publication.markScheduled()

View File

@ -0,0 +1,40 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.subscription.Subscription;
/**
* A message publication is created for each asynchronous message dispatch. It reflects the state
* of the corresponding message publication process, i.e. provides information whether the
* publication was successfully scheduled, is currently running etc.
* <p/>
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
* be used in multiple threads simultaneously .
*
* @author bennidi
* Date: 11/16/12
*/
public interface IMessagePublication {
public boolean add(Subscription subscription);
/*
TODO: document state transitions
*/
public void execute();
public boolean isFinished();
public boolean isRunning();
public boolean isScheduled();
public void markDelivered();
public IMessagePublication markScheduled();
public boolean isDeadEvent();
public boolean isFilteredEvent();
public Object getMessage();
}

View File

@ -24,11 +24,11 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
}
public MessagePublication publishAsync(T message) {
public IMessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));
}
public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
}
@ -41,7 +41,7 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
*/
public void publish(T message) {
try {
MessagePublication publication = createMessagePublication(message);
IMessagePublication publication = createMessagePublication(message);
publication.execute();
} catch (Throwable e) {
handlePublicationError(new PublicationError()

View File

@ -17,7 +17,7 @@ import java.util.Collection;
* @author bennidi
* Date: 11/16/12
*/
public class MessagePublication {
public class MessagePublication implements IMessagePublication {
private final Collection<Subscription> subscriptions;
private final Object message;
@ -40,7 +40,7 @@ public class MessagePublication {
/*
TODO: document state transitions
*/
protected void execute() {
public void execute() {
state = State.Running;
for (Subscription sub : subscriptions) {
sub.publish(this, message);
@ -99,7 +99,7 @@ public class MessagePublication {
public static class Factory {
public MessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
public IMessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(runtime, subscriptions, message, State.Initial);
}

View File

@ -22,7 +22,7 @@ public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements PubSu
@Override
public void publish(T message) {
try {
MessagePublication publication = createMessagePublication(message);
IMessagePublication publication = createMessagePublication(message);
publication.execute();
} catch (Throwable e) {
handlePublicationError(new PublicationError()

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
@ -132,12 +133,12 @@ public interface Feature {
return new AsynchronousMessageDispatch()
.setNumberOfMessageDispatchers(2)
.setDispatcherThreadFactory(MessageDispatchThreadFactory)
.setMessageQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
.setMessageQueue(new LinkedBlockingQueue<IMessagePublication>(Integer.MAX_VALUE));
}
private int numberOfMessageDispatchers;
private BlockingQueue<MessagePublication> pendingMessages;
private BlockingQueue<IMessagePublication> pendingMessages;
private ThreadFactory dispatcherThreadFactory;
public int getNumberOfMessageDispatchers() {
@ -149,11 +150,11 @@ public interface Feature {
return this;
}
public BlockingQueue<MessagePublication> getPendingMessages() {
public BlockingQueue<IMessagePublication> getPendingMessages() {
return pendingMessages;
}
public AsynchronousMessageDispatch setMessageQueue(BlockingQueue<MessagePublication> pendingMessages) {
public AsynchronousMessageDispatch setMessageQueue(BlockingQueue<IMessagePublication> pendingMessages) {
this.pendingMessages = pendingMessages;
return this;
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.bus.error;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import java.lang.reflect.Method;
@ -48,7 +48,7 @@ public class PublicationError{
public PublicationError(final Throwable cause,
final String message,
final MessagePublication publication) {
final IMessagePublication publication) {
this.cause = cause;
this.message = message;
this.publishedObject = publication != null ? publication.getMessage() : null;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import java.util.concurrent.TimeUnit;
@ -19,7 +19,7 @@ public interface ISyncAsyncPublicationCommand extends IPublicationCommand {
*
* @return A message publication that can be used to access information about the state of
*/
MessagePublication asynchronously();
IMessagePublication asynchronously();
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
@ -31,5 +31,5 @@ public interface ISyncAsyncPublicationCommand extends IPublicationCommand {
*
* @return A message publication that wraps up the publication request
*/
MessagePublication asynchronously(long timeout, TimeUnit unit);
IMessagePublication asynchronously(long timeout, TimeUnit unit);
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.bus.publication;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import java.util.concurrent.TimeUnit;
@ -27,12 +27,12 @@ public class SyncAsyncPostCommand<T> implements ISyncAsyncPublicationCommand {
}
@Override
public MessagePublication asynchronously() {
public IMessagePublication asynchronously() {
return mBassador.publishAsync(message);
}
@Override
public MessagePublication asynchronously(long timeout, TimeUnit unit) {
public IMessagePublication asynchronously(long timeout, TimeUnit unit) {
return mBassador.publishAsync(message, timeout, unit);
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.MessageEnvelope;
/**
@ -20,7 +20,7 @@ public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher {
}
@Override
public void dispatch(MessagePublication publication, Object message, Iterable listeners){
public void dispatch(IMessagePublication publication, Object message, Iterable listeners){
getDelegate().dispatch(publication, new MessageEnvelope(message), listeners);
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.listener.IMessageFilter;
/**
@ -36,7 +36,7 @@ public final class FilteredMessageDispatcher extends DelegatingMessageDispatcher
@Override
public void dispatch(MessagePublication publication, Object message, Iterable listeners){
public void dispatch(IMessagePublication publication, Object message, Iterable listeners){
if (passesFilter(message)) {
getDelegate().dispatch(publication, message, listeners);
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.ISubscriptionContextAware;
/**
@ -29,7 +29,7 @@ public interface IMessageDispatcher extends ISubscriptionContextAware {
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
void dispatch(MessagePublication publication, Object message, Iterable listeners);
void dispatch(IMessagePublication publication, Object message, Iterable listeners);
/**
* Get the handler invocation that will be used to deliver the

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
@ -24,7 +24,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
}
@Override
public void dispatch(final MessagePublication publication, final Object message, final Iterable listeners){
public void dispatch(final IMessagePublication publication, final Object message, final Iterable listeners){
publication.markDelivered();
for (Object listener : listeners) {
getInvocation().invoke(listener, message);

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
@ -68,7 +68,7 @@ public class Subscription {
}
public void publish(MessagePublication publication, Object message){
public void publish(IMessagePublication publication, Object message){
if(listeners.size() > 0)
dispatcher.dispatch(publication, message, listeners);
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
@ -39,7 +39,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
bus.subscribe(handler);
}
MessagePublication publication = null;
IMessagePublication publication = null;
for(int i = 0; i < numberOfMessages; i++){
publication = bus.post(new Object()).asynchronously();
}

View File

@ -2,7 +2,7 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.IMessagePublication;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
@ -37,11 +37,11 @@ public abstract class MessageBusTest extends AssertSupport {
};
private StrongConcurrentSet<MessagePublication> issuedPublications = new StrongConcurrentSet<MessagePublication>();
private StrongConcurrentSet<IMessagePublication> issuedPublications = new StrongConcurrentSet<IMessagePublication>();
@Before
public void setUp(){
issuedPublications = new StrongConcurrentSet<MessagePublication>();
issuedPublications = new StrongConcurrentSet<IMessagePublication>();
for(MessageTypes mes : MessageTypes.values())
mes.reset();
}
@ -66,14 +66,14 @@ public abstract class MessageBusTest extends AssertSupport {
return bus;
}
protected void track(MessagePublication asynchronously) {
protected void track(IMessagePublication asynchronously) {
issuedPublications.add(asynchronously);
}
public void waitForPublications(long timeOutInMs){
long start = System.currentTimeMillis();
while(issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){
for(MessagePublication pub : issuedPublications){
for(IMessagePublication pub : issuedPublications){
if(pub.isFinished())
issuedPublications.remove(pub);
}
@ -82,7 +82,7 @@ public abstract class MessageBusTest extends AssertSupport {
fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms");
}
public void addPublication(MessagePublication publication){
public void addPublication(IMessagePublication publication){
issuedPublications.add(publication);
}