From 2338c0ab189e666bfdef6cd7365c8fb2530555d9 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 4 Feb 2015 01:14:29 +0100 Subject: [PATCH] Removed 'MessagePublication' functionality, and now directly publishes messages --- .../mbassy/{bus => _misc}/BusFactory.java | 4 +- .../mbassy/bus/AbstractPubSubSupport.java | 46 +++++--- .../bus/AbstractSyncAsyncMessageBus.java | 32 ++--- .../mbassy/bus/{common => }/DeadMessage.java | 7 +- .../engio/mbassy/bus/IMessagePublication.java | 38 ------ .../java/net/engio/mbassy/bus/MBassador.java | 13 ++- .../engio/mbassy/bus/MessagePublication.java | 110 ------------------ .../net/engio/mbassy/bus/SyncMessageBus.java | 9 +- .../mbassy/bus/common/PubSubSupport.java | 10 +- .../net/engio/mbassy/bus/config/Feature.java | 33 +----- .../mbassy/bus/error/PublicationError.java | 28 +++-- .../mbassy/subscription/Subscription.java | 11 +- .../net/engio/mbassy/AsyncFIFOBusTest.java | 2 +- .../net/engio/mbassy/DeadMessageTest.java | 2 +- .../java/net/engio/mbassy/SyncBusTest.java | 2 +- .../engio/mbassy/SynchronizedHandlerTest.java | 29 +++-- .../engio/mbassy/common/MessageBusTest.java | 28 ----- 17 files changed, 103 insertions(+), 301 deletions(-) rename src/main/java/net/engio/mbassy/{bus => _misc}/BusFactory.java (92%) rename src/main/java/net/engio/mbassy/bus/{common => }/DeadMessage.java (69%) delete mode 100644 src/main/java/net/engio/mbassy/bus/IMessagePublication.java delete mode 100644 src/main/java/net/engio/mbassy/bus/MessagePublication.java diff --git a/src/main/java/net/engio/mbassy/bus/BusFactory.java b/src/main/java/net/engio/mbassy/_misc/BusFactory.java similarity index 92% rename from src/main/java/net/engio/mbassy/bus/BusFactory.java rename to src/main/java/net/engio/mbassy/_misc/BusFactory.java index 683a84c..e289f10 100644 --- a/src/main/java/net/engio/mbassy/bus/BusFactory.java +++ b/src/main/java/net/engio/mbassy/_misc/BusFactory.java @@ -1,5 +1,7 @@ -package net.engio.mbassy.bus; +package net.engio.mbassy._misc; +import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.SyncMessageBus; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.config.Feature; diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index d4da00e..5244ed2 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -5,7 +5,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import net.engio.mbassy.bus.common.DeadMessage; import net.engio.mbassy.bus.common.PubSubSupport; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; @@ -25,8 +24,6 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new ArrayList(); - private final MessagePublication.Factory publicationFactory; - private final SubscriptionManager subscriptionManager; @@ -34,14 +31,8 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { // configure the pub sub feature Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class); this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers()); - this.publicationFactory = pubSubFeature.getPublicationFactory(); } - protected MessagePublication.Factory getPublicationFactory() { - return this.publicationFactory; - } - - public Collection getRegisteredErrorHandlers() { return Collections.unmodifiableCollection(this.errorHandlers); } @@ -64,19 +55,48 @@ public abstract class AbstractPubSubSupport implements PubSubSupport { } } - protected IMessagePublication createMessagePublication(T message) { + protected void publishMessage(T message) { Class class1 = message.getClass(); Collection subscriptions = getSubscriptionsByMessageType(class1); - if ((subscriptions == null || subscriptions.isEmpty()) && !class1.equals(DeadMessage.class)) { + if (subscriptions == null || subscriptions.isEmpty()) { // Dead Event subscriptions = getSubscriptionsByMessageType(DeadMessage.class); - return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message)); + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(deadMessage); + } } else { - return getPublicationFactory().createPublication(this, subscriptions, message); + boolean delivered = false; + boolean success = false; + for (Subscription sub : subscriptions) { + delivered = sub.publishToSubscription(message); + if (delivered) { + success = true; + } + } + + // if the message did not have any listener/handler accept it + if (!success) { + if (!isDeadEvent(message)) { + // Dead Event + subscriptions = getSubscriptionsByMessageType(DeadMessage.class); + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(deadMessage); + } + } + } } } + private final boolean isDeadEvent(Object message) { + return DeadMessage.class.equals(message.getClass()); + } + + // obtain the set of subscriptions for the given message type // Note: never returns null! protected Collection getSubscriptionsByMessageType(Class messageType) { diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index 00a968b..6b38206 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import net.engio.mbassy.bus.common.IMessageBus; @@ -14,9 +15,6 @@ import net.engio.mbassy.bus.error.PublicationError; /** * The base class for all message bus implementations with support for asynchronous message dispatch - * - * @param - * @param

*/ public abstract class AbstractSyncAsyncMessageBus extends AbstractPubSubSupport implements IMessageBus { @@ -28,14 +26,13 @@ public abstract class AbstractSyncAsyncMessageBus private final List dispatchers; // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue pendingMessages; + private final BlockingQueue pendingMessages = new LinkedBlockingQueue(Integer.MAX_VALUE/16); protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) { super(configuration); // configure asynchronous message dispatch Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class); - this.pendingMessages = asyncDispatch.getPendingMessages(); this.dispatchers = new ArrayList(asyncDispatch.getNumberOfMessageDispatchers()); initDispatcherThreads(asyncDispatch); @@ -52,16 +49,16 @@ public abstract class AbstractSyncAsyncMessageBus Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() { @Override public void run() { + T message = null; while (true) { - IMessagePublication publication = null; try { - publication = AbstractSyncAsyncMessageBus.this.pendingMessages.take(); - publication.execute(); + message = AbstractSyncAsyncMessageBus.this.pendingMessages.take(); + publishMessage(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch(Throwable t){ - handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",publication)); + handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message)); } } } @@ -74,25 +71,20 @@ public abstract class AbstractSyncAsyncMessageBus // this method queues a message delivery request - protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) { + protected void addAsynchronousPublication(T message) { try { - this.pendingMessages.put(publication); - return publication.markScheduled(); + this.pendingMessages.put(message); } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication)); - return publication; + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); } } // this method queues a message delivery request - protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) { + protected void addAsynchronousPublication(T message, long timeout, TimeUnit unit) { try { - return this.pendingMessages.offer(publication, timeout, unit) - ? publication.markScheduled() - : publication; + this.pendingMessages.offer(message, timeout, unit); } catch (InterruptedException e) { - handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication)); - return publication; + handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message)); } } diff --git a/src/main/java/net/engio/mbassy/bus/common/DeadMessage.java b/src/main/java/net/engio/mbassy/bus/DeadMessage.java similarity index 69% rename from src/main/java/net/engio/mbassy/bus/common/DeadMessage.java rename to src/main/java/net/engio/mbassy/bus/DeadMessage.java index 9a73864..b197dc2 100644 --- a/src/main/java/net/engio/mbassy/bus/common/DeadMessage.java +++ b/src/main/java/net/engio/mbassy/bus/DeadMessage.java @@ -1,4 +1,6 @@ -package net.engio.mbassy.bus.common; +package net.engio.mbassy.bus; + +import net.engio.mbassy.bus.common.PublicationEvent; /** * The dead message event is published whenever no message @@ -9,8 +11,7 @@ package net.engio.mbassy.bus.common; */ public final class DeadMessage extends PublicationEvent { - public DeadMessage(Object message) { + DeadMessage(Object message) { super(message); } - } diff --git a/src/main/java/net/engio/mbassy/bus/IMessagePublication.java b/src/main/java/net/engio/mbassy/bus/IMessagePublication.java deleted file mode 100644 index ca2ed3b..0000000 --- a/src/main/java/net/engio/mbassy/bus/IMessagePublication.java +++ /dev/null @@ -1,38 +0,0 @@ -package net.engio.mbassy.bus; - -import net.engio.mbassy.subscription.Subscription; - -/** - * A message publication is created for each asynchronous message dispatch. It reflects the state - * of the corresponding message publication process, i.e. provides information whether the - * publication was successfully scheduled, is currently running etc. - *

- * A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to - * be used in multiple threads simultaneously . - * - * @author bennidi - * Date: 11/16/12 - */ -public interface IMessagePublication { - - public boolean add(Subscription subscription); - - /* - TODO: document state transitions - */ - public void execute(); - - public boolean isFinished(); - - public boolean isRunning(); - - public boolean isScheduled(); - - public void markDelivered(); - - public IMessagePublication markScheduled(); - - public boolean isDeadEvent(); - - public Object getMessage(); -} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index ee8868e..243d4ec 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -31,8 +31,7 @@ public class MBassador extends AbstractSyncAsyncMessageBus implements IMes @Override public void publish(T message) { try { - IMessagePublication publication = createMessagePublication(message); - publication.execute(); + publishMessage(message); } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") @@ -51,8 +50,9 @@ public class MBassador extends AbstractSyncAsyncMessageBus implements IMes * * @return A message publication that can be used to access information about it's state */ - public IMessagePublication publishAsync(T message) { - return addAsynchronousPublication(createMessagePublication(message)); + @Override + public void publishAsync(T message) { + addAsynchronousPublication(message); } @@ -66,7 +66,8 @@ public class MBassador extends AbstractSyncAsyncMessageBus implements IMes * * @return A message publication that wraps up the publication request */ - public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) { - return addAsynchronousPublication(createMessagePublication(message), timeout, unit); + @Override + public void publishAsync(T message, long timeout, TimeUnit unit) { + addAsynchronousPublication(message, timeout, unit); } } diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java deleted file mode 100644 index 7cc3591..0000000 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ /dev/null @@ -1,110 +0,0 @@ -package net.engio.mbassy.bus; - -import java.util.Collection; - -import net.engio.mbassy.bus.common.DeadMessage; -import net.engio.mbassy.bus.common.PubSubSupport; -import net.engio.mbassy.subscription.Subscription; - -/** - * A message publication is created for each asynchronous message dispatch. It reflects the state - * of the corresponding message publication process, i.e. provides information whether the - * publication was successfully scheduled, is currently running etc. - *

- * A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to - * be used in multiple threads simultaneously . - * - * @author bennidi - * Date: 11/16/12 - */ -public class MessagePublication implements IMessagePublication { - - private final Collection subscriptions; - private final Object message; - // message publications can be referenced by multiple threads to query publication progress - private volatile State state = State.Initial; - private volatile boolean delivered = false; - private PubSubSupport pubSub; - - protected MessagePublication(PubSubSupport pubSub, Collection subscriptions, Object message, State initialState) { - this.pubSub = pubSub; - this.subscriptions = subscriptions; - this.message = message; - this.state = initialState; - } - - @Override - public boolean add(Subscription subscription) { - return this.subscriptions.add(subscription); - } - - /* - TODO: document state transitions - */ - @Override - public void execute() { - this.state = State.Running; - for (Subscription sub : this.subscriptions) { - sub.publish(this, this.message); - } - this.state = State.Finished; - // if the message has not been marked delivered by the dispatcher - if (!this.delivered) { - if (!isDeadEvent()) { - this.pubSub.publish(new DeadMessage(this.message)); - } - } - } - - @Override - public boolean isFinished() { - return this.state.equals(State.Finished); - } - - @Override - public boolean isRunning() { - return this.state.equals(State.Running); - } - - @Override - public boolean isScheduled() { - return this.state.equals(State.Scheduled); - } - - @Override - public void markDelivered() { - this.delivered = true; - } - - @Override - public MessagePublication markScheduled() { - if (this.state.equals(State.Initial)) { - this.state = State.Scheduled; - } - return this; - } - - - @Override - public boolean isDeadEvent() { - return DeadMessage.class.equals(this.message.getClass()); - } - - @Override - public Object getMessage() { - return this.message; - } - - private enum State { - Initial, Scheduled, Running, Finished, Error - } - - public static class Factory { - - public IMessagePublication createPublication(PubSubSupport pubSub, Collection subscriptions, Object message) { - return new MessagePublication(pubSub, subscriptions, message, State.Initial); - } - - } - -} diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index d1f9139..f1246fe 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -22,8 +22,7 @@ public class SyncMessageBus extends AbstractPubSubSupport implements IMess @Override public void publish(T message) { try { - IMessagePublication publication = createMessagePublication(message); - publication.execute(); + publishMessage(message); } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") @@ -33,15 +32,13 @@ public class SyncMessageBus extends AbstractPubSubSupport implements IMess } @Override - public IMessagePublication publishAsync(T message) { + public void publishAsync(T message) { publish(message); - return null; } @Override - public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) { + public void publishAsync(T message, long timeout, TimeUnit unit) { publish(message); - return null; } @Override diff --git a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java index 4c85e7c..8b5df84 100644 --- a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java @@ -2,8 +2,6 @@ package net.engio.mbassy.bus.common; import java.util.concurrent.TimeUnit; -import net.engio.mbassy.bus.IMessagePublication; - /** * This interface defines the very basic message publication semantics according to the publish subscribe pattern. * Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its @@ -50,10 +48,8 @@ public interface PubSubSupport { *

* If an unbound queuing strategy is used the call returns immediately. * If a bounded queue is used the call might block until the message can be placed in the queue. - * - * @return A message publication that can be used to access information about it's state */ - IMessagePublication publishAsync(T message); + void publishAsync(T message); /** * Execute the message publication asynchronously. The behaviour of this method depends on the @@ -62,8 +58,6 @@ public interface PubSubSupport { * If an unbound queuing strategy is used the call returns immediately. * If a bounded queue is used the call will block until the message can be placed in the queue * or the timeout is reached. - * - * @return A message publication that wraps up the publication request */ - IMessagePublication publishAsync(T message, long timeout, TimeUnit unit); + void publishAsync(T message, long timeout, TimeUnit unit); } diff --git a/src/main/java/net/engio/mbassy/bus/config/Feature.java b/src/main/java/net/engio/mbassy/bus/config/Feature.java index 76286b7..72e8e70 100644 --- a/src/main/java/net/engio/mbassy/bus/config/Feature.java +++ b/src/main/java/net/engio/mbassy/bus/config/Feature.java @@ -1,6 +1,5 @@ package net.engio.mbassy.bus.config; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -9,8 +8,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.IMessagePublication; -import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; /** @@ -26,11 +23,9 @@ public interface Feature { public static final SyncPubSub Default(){ return new SyncPubSub() - .setMetadataReader(new MetadataReader()) - .setPublicationFactory(new MessagePublication.Factory()); + .setMetadataReader(new MetadataReader()); } - private MessagePublication.Factory publicationFactory; private MetadataReader metadataReader; @@ -42,20 +37,6 @@ public interface Feature { this.metadataReader = metadataReader; return this; } - - /** - * The message publication factory is used to wrap a published message - * in a {@link MessagePublication} for processing. - * @return The factory to be used by the bus to create the publications - */ - public MessagePublication.Factory getPublicationFactory() { - return publicationFactory; - } - - public SyncPubSub setPublicationFactory(MessagePublication.Factory publicationFactory) { - this.publicationFactory = publicationFactory; - return this; - } } class AsynchronousHandlerInvocation implements Feature { @@ -114,13 +95,11 @@ public interface Feature { public static final AsynchronousMessageDispatch Default(){ return new AsynchronousMessageDispatch() .setNumberOfMessageDispatchers(2) - .setDispatcherThreadFactory(MessageDispatchThreadFactory) - .setMessageQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)); + .setDispatcherThreadFactory(MessageDispatchThreadFactory); } private int numberOfMessageDispatchers; - private BlockingQueue pendingMessages; private ThreadFactory dispatcherThreadFactory; public int getNumberOfMessageDispatchers() { @@ -132,14 +111,6 @@ public interface Feature { return this; } - public BlockingQueue getPendingMessages() { - return pendingMessages; - } - - public AsynchronousMessageDispatch setMessageQueue(BlockingQueue pendingMessages) { - this.pendingMessages = pendingMessages; - return this; - } public ThreadFactory getDispatcherThreadFactory() { return dispatcherThreadFactory; diff --git a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java index dc98555..03013ff 100644 --- a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java +++ b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java @@ -1,7 +1,5 @@ package net.engio.mbassy.bus.error; -import net.engio.mbassy.bus.IMessagePublication; - import java.lang.reflect.Method; /** @@ -48,10 +46,10 @@ public class PublicationError{ public PublicationError(final Throwable cause, final String message, - final IMessagePublication publication) { + final Object messageObject) { this.cause = cause; this.message = message; - this.publishedObject = publication != null ? publication.getMessage() : null; + this.publishedObject = messageObject; } @@ -68,7 +66,7 @@ public class PublicationError{ * @return The Throwable giving rise to this PublicationError. */ public Throwable getCause() { - return cause; + return this.cause; } /** @@ -83,7 +81,7 @@ public class PublicationError{ } public String getMessage() { - return message; + return this.message; } public PublicationError setMessage(String message) { @@ -92,7 +90,7 @@ public class PublicationError{ } public Method getHandler() { - return handler; + return this.handler; } public PublicationError setHandler(Method handler) { @@ -101,7 +99,7 @@ public class PublicationError{ } public Object getListener() { - return listener; + return this.listener; } public PublicationError setListener(Object listener) { @@ -110,7 +108,7 @@ public class PublicationError{ } public Object getPublishedObject() { - return publishedObject; + return this.publishedObject; } public PublicationError setPublishedObject(Object publishedObject) { @@ -123,18 +121,18 @@ public class PublicationError{ */ @Override public String toString() { - String newLine = System.getProperty("line.separator"); + String newLine = System.getProperty("line.separator"); return "PublicationError{" + newLine + - "\tcause=" + cause + + "\tcause=" + this.cause + newLine + - "\tmessage='" + message + '\'' + + "\tmessage='" + this.message + '\'' + newLine + - "\thandler=" + handler + + "\thandler=" + this.handler + newLine + - "\tlistener=" + listener + + "\tlistener=" + this.listener + newLine + - "\tpublishedObject=" + publishedObject + + "\tpublishedObject=" + this.publishedObject + '}'; } } diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index e91c0e8..594a0d3 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -4,7 +4,6 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collection; -import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.common.IConcurrentSet; @@ -76,9 +75,11 @@ public class Subscription { return this.handlerMetadata.getHandledMessages(); } - public void publish(IMessagePublication publication, Object message){ + /** + * @return TRUE if there were listeners/handlers available to publish to + */ + public boolean publishToSubscription(Object message){ if (this.listeners.size() > 0) { - publication.markDelivered(); /** * Delivers the given message to the given set of listeners. @@ -113,7 +114,11 @@ public class Subscription { handler, listener, message)); } } + + return true; } + + return false; } diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java index eb6feee..277d02a 100644 --- a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -3,8 +3,8 @@ package net.engio.mbassy; import java.util.LinkedList; import java.util.List; +import net.engio.mbassy._misc.BusFactory; import net.engio.mbassy.annotations.Handler; -import net.engio.mbassy.bus.BusFactory; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index 65e8dd7..aea1035 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -3,8 +3,8 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.annotations.Handler; +import net.engio.mbassy.bus.DeadMessage; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.common.DeadMessage; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.ListenerFactory; import net.engio.mbassy.common.MessageBusTest; diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index 1069aa1..c06119c 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -2,7 +2,7 @@ package net.engio.mbassy; import java.util.concurrent.atomic.AtomicInteger; -import net.engio.mbassy.bus.BusFactory; +import net.engio.mbassy._misc.BusFactory; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.error.IPublicationErrorHandler; diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index 1e15302..4cd81fc 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -2,10 +2,10 @@ package net.engio.mbassy; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Synchronized; -import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.bus.config.IBusConfiguration; @@ -21,8 +21,7 @@ import org.junit.Test; */ public class SynchronizedHandlerTest extends MessageBusTest { - - private static int incrementsPerMessage = 10000; + private static AtomicInteger counter = new AtomicInteger(0); private static int numberOfMessages = 1000; private static int numberOfListeners = 1000; @@ -30,8 +29,8 @@ public class SynchronizedHandlerTest extends MessageBusTest { public void testSynchronizedWithSynchronousInvocation(){ List handlers = new LinkedList(); IBusConfiguration config = SyncAsync(); - config.getFeature(Feature.AsynchronousMessageDispatch.class) - .setNumberOfMessageDispatchers(6); + config.getFeature(Feature.AsynchronousMessageDispatch.class).setNumberOfMessageDispatchers(6); + IMessageBus bus = createBus(config); for(int i = 0; i < numberOfListeners; i++){ SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery(); @@ -39,31 +38,29 @@ public class SynchronizedHandlerTest extends MessageBusTest { bus.subscribe(handler); } - IMessagePublication publication = null; for(int i = 0; i < numberOfMessages; i++){ - publication = bus.publishAsync(new Object()); + bus.publishAsync(new Object()); } + + int totalCount = numberOfListeners * numberOfMessages; + int expireCount = 1000; + // wait for last publication - while (!publication.isFinished()){ + while (expireCount-- > 0 && counter.get() < totalCount){ pause(100); } - for(SynchronizedWithSynchronousDelivery handler : handlers){ - assertEquals(incrementsPerMessage * numberOfMessages, handler.counter); + if (expireCount <= 0) { + fail("Count '" + counter.get() + "' was incorrect!"); } - } public static class SynchronizedWithSynchronousDelivery { - private int counter = 0; - @Handler @Synchronized public void handleMessage(Object o){ - for(int i = 0; i < incrementsPerMessage; i++){ - this.counter++; - } + counter.getAndIncrement(); } } diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index 84a5eee..2b4bf9b 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -1,9 +1,6 @@ package net.engio.mbassy.common; -import java.util.concurrent.ConcurrentHashMap; - import junit.framework.Assert; -import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.config.BusConfiguration; import net.engio.mbassy.bus.config.Feature; @@ -40,11 +37,9 @@ public abstract class MessageBusTest extends AssertSupport { }; private static final Object mapObject = new Object(); - private ConcurrentHashMap issuedPublications = new ConcurrentHashMap(); @Before public void setUp(){ - this.issuedPublications = new ConcurrentHashMap(); for(MessageTypes mes : MessageTypes.values()) { mes.reset(); } @@ -69,27 +64,4 @@ public abstract class MessageBusTest extends AssertSupport { ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); return bus; } - - protected void track(IMessagePublication asynchronously) { - this.issuedPublications.put(asynchronously, mapObject); - } - - public void waitForPublications(long timeOutInMs){ - long start = System.currentTimeMillis(); - while(this.issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){ - for(IMessagePublication pub : this.issuedPublications.keySet()){ - if(pub.isFinished()) { - this.issuedPublications.remove(pub); - } - } - } - if(this.issuedPublications.size() > 0) { - fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms"); - } - } - - public void addPublication(IMessagePublication publication){ - this.issuedPublications.put(publication, mapObject); - } - }