Removed 'MessagePublication' functionality, and now directly publishes messages
This commit is contained in:
parent
8ed29f6b7f
commit
2338c0ab18
@ -1,5 +1,7 @@
|
|||||||
package net.engio.mbassy.bus;
|
package net.engio.mbassy._misc;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.MBassador;
|
||||||
|
import net.engio.mbassy.bus.SyncMessageBus;
|
||||||
import net.engio.mbassy.bus.common.IMessageBus;
|
import net.engio.mbassy.bus.common.IMessageBus;
|
||||||
import net.engio.mbassy.bus.config.BusConfiguration;
|
import net.engio.mbassy.bus.config.BusConfiguration;
|
||||||
import net.engio.mbassy.bus.config.Feature;
|
import net.engio.mbassy.bus.config.Feature;
|
@ -5,7 +5,6 @@ import java.util.Collection;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.common.DeadMessage;
|
|
||||||
import net.engio.mbassy.bus.common.PubSubSupport;
|
import net.engio.mbassy.bus.common.PubSubSupport;
|
||||||
import net.engio.mbassy.bus.config.Feature;
|
import net.engio.mbassy.bus.config.Feature;
|
||||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||||
@ -25,8 +24,6 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
|
|||||||
// this handler will receive all errors that occur during message dispatch or message handling
|
// this handler will receive all errors that occur during message dispatch or message handling
|
||||||
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
|
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
|
||||||
|
|
||||||
private final MessagePublication.Factory publicationFactory;
|
|
||||||
|
|
||||||
private final SubscriptionManager subscriptionManager;
|
private final SubscriptionManager subscriptionManager;
|
||||||
|
|
||||||
|
|
||||||
@ -34,14 +31,8 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
|
|||||||
// configure the pub sub feature
|
// configure the pub sub feature
|
||||||
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
|
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
|
||||||
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers());
|
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers());
|
||||||
this.publicationFactory = pubSubFeature.getPublicationFactory();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MessagePublication.Factory getPublicationFactory() {
|
|
||||||
return this.publicationFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
|
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
|
||||||
return Collections.unmodifiableCollection(this.errorHandlers);
|
return Collections.unmodifiableCollection(this.errorHandlers);
|
||||||
}
|
}
|
||||||
@ -64,19 +55,48 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected IMessagePublication createMessagePublication(T message) {
|
protected void publishMessage(T message) {
|
||||||
Class<? extends Object> class1 = message.getClass();
|
Class<? extends Object> class1 = message.getClass();
|
||||||
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(class1);
|
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(class1);
|
||||||
|
|
||||||
if ((subscriptions == null || subscriptions.isEmpty()) && !class1.equals(DeadMessage.class)) {
|
if (subscriptions == null || subscriptions.isEmpty()) {
|
||||||
// Dead Event
|
// Dead Event
|
||||||
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
||||||
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
|
DeadMessage deadMessage = new DeadMessage(message);
|
||||||
|
|
||||||
|
for (Subscription sub : subscriptions) {
|
||||||
|
sub.publishToSubscription(deadMessage);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return getPublicationFactory().createPublication(this, subscriptions, message);
|
boolean delivered = false;
|
||||||
|
boolean success = false;
|
||||||
|
for (Subscription sub : subscriptions) {
|
||||||
|
delivered = sub.publishToSubscription(message);
|
||||||
|
if (delivered) {
|
||||||
|
success = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the message did not have any listener/handler accept it
|
||||||
|
if (!success) {
|
||||||
|
if (!isDeadEvent(message)) {
|
||||||
|
// Dead Event
|
||||||
|
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
|
||||||
|
DeadMessage deadMessage = new DeadMessage(message);
|
||||||
|
|
||||||
|
for (Subscription sub : subscriptions) {
|
||||||
|
sub.publishToSubscription(deadMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final boolean isDeadEvent(Object message) {
|
||||||
|
return DeadMessage.class.equals(message.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// obtain the set of subscriptions for the given message type
|
// obtain the set of subscriptions for the given message type
|
||||||
// Note: never returns null!
|
// Note: never returns null!
|
||||||
protected Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
|
protected Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
|
||||||
|
@ -5,6 +5,7 @@ import java.util.List;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.common.IMessageBus;
|
import net.engio.mbassy.bus.common.IMessageBus;
|
||||||
@ -14,9 +15,6 @@ import net.engio.mbassy.bus.error.PublicationError;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* The base class for all message bus implementations with support for asynchronous message dispatch
|
* The base class for all message bus implementations with support for asynchronous message dispatch
|
||||||
*
|
|
||||||
* @param <T>
|
|
||||||
* @param <P>
|
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractSyncAsyncMessageBus<T>
|
public abstract class AbstractSyncAsyncMessageBus<T>
|
||||||
extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
extends AbstractPubSubSupport<T> implements IMessageBus<T> {
|
||||||
@ -28,14 +26,13 @@ public abstract class AbstractSyncAsyncMessageBus<T>
|
|||||||
private final List<Thread> dispatchers;
|
private final List<Thread> dispatchers;
|
||||||
|
|
||||||
// all pending messages scheduled for asynchronous dispatch are queued here
|
// all pending messages scheduled for asynchronous dispatch are queued here
|
||||||
private final BlockingQueue<IMessagePublication> pendingMessages;
|
private final BlockingQueue<T> pendingMessages = new LinkedBlockingQueue<T>(Integer.MAX_VALUE/16);
|
||||||
|
|
||||||
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
|
protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
|
||||||
super(configuration);
|
super(configuration);
|
||||||
|
|
||||||
// configure asynchronous message dispatch
|
// configure asynchronous message dispatch
|
||||||
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
|
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
|
||||||
this.pendingMessages = asyncDispatch.getPendingMessages();
|
|
||||||
this.dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
|
this.dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
|
||||||
initDispatcherThreads(asyncDispatch);
|
initDispatcherThreads(asyncDispatch);
|
||||||
|
|
||||||
@ -52,16 +49,16 @@ public abstract class AbstractSyncAsyncMessageBus<T>
|
|||||||
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
|
Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
T message = null;
|
||||||
while (true) {
|
while (true) {
|
||||||
IMessagePublication publication = null;
|
|
||||||
try {
|
try {
|
||||||
publication = AbstractSyncAsyncMessageBus.this.pendingMessages.take();
|
message = AbstractSyncAsyncMessageBus.this.pendingMessages.take();
|
||||||
publication.execute();
|
publishMessage(message);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
return;
|
return;
|
||||||
} catch(Throwable t){
|
} catch(Throwable t){
|
||||||
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch",publication));
|
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -74,25 +71,20 @@ public abstract class AbstractSyncAsyncMessageBus<T>
|
|||||||
|
|
||||||
|
|
||||||
// this method queues a message delivery request
|
// this method queues a message delivery request
|
||||||
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) {
|
protected void addAsynchronousPublication(T message) {
|
||||||
try {
|
try {
|
||||||
this.pendingMessages.put(publication);
|
this.pendingMessages.put(message);
|
||||||
return publication.markScheduled();
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
|
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||||
return publication;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this method queues a message delivery request
|
// this method queues a message delivery request
|
||||||
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) {
|
protected void addAsynchronousPublication(T message, long timeout, TimeUnit unit) {
|
||||||
try {
|
try {
|
||||||
return this.pendingMessages.offer(publication, timeout, unit)
|
this.pendingMessages.offer(message, timeout, unit);
|
||||||
? publication.markScheduled()
|
|
||||||
: publication;
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
|
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
|
||||||
return publication;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
package net.engio.mbassy.bus.common;
|
package net.engio.mbassy.bus;
|
||||||
|
|
||||||
|
import net.engio.mbassy.bus.common.PublicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The dead message event is published whenever no message
|
* The dead message event is published whenever no message
|
||||||
@ -9,8 +11,7 @@ package net.engio.mbassy.bus.common;
|
|||||||
*/
|
*/
|
||||||
public final class DeadMessage extends PublicationEvent {
|
public final class DeadMessage extends PublicationEvent {
|
||||||
|
|
||||||
public DeadMessage(Object message) {
|
DeadMessage(Object message) {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -1,38 +0,0 @@
|
|||||||
package net.engio.mbassy.bus;
|
|
||||||
|
|
||||||
import net.engio.mbassy.subscription.Subscription;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A message publication is created for each asynchronous message dispatch. It reflects the state
|
|
||||||
* of the corresponding message publication process, i.e. provides information whether the
|
|
||||||
* publication was successfully scheduled, is currently running etc.
|
|
||||||
* <p/>
|
|
||||||
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
|
|
||||||
* be used in multiple threads simultaneously .
|
|
||||||
*
|
|
||||||
* @author bennidi
|
|
||||||
* Date: 11/16/12
|
|
||||||
*/
|
|
||||||
public interface IMessagePublication {
|
|
||||||
|
|
||||||
public boolean add(Subscription subscription);
|
|
||||||
|
|
||||||
/*
|
|
||||||
TODO: document state transitions
|
|
||||||
*/
|
|
||||||
public void execute();
|
|
||||||
|
|
||||||
public boolean isFinished();
|
|
||||||
|
|
||||||
public boolean isRunning();
|
|
||||||
|
|
||||||
public boolean isScheduled();
|
|
||||||
|
|
||||||
public void markDelivered();
|
|
||||||
|
|
||||||
public IMessagePublication markScheduled();
|
|
||||||
|
|
||||||
public boolean isDeadEvent();
|
|
||||||
|
|
||||||
public Object getMessage();
|
|
||||||
}
|
|
@ -31,8 +31,7 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T> implements IMes
|
|||||||
@Override
|
@Override
|
||||||
public void publish(T message) {
|
public void publish(T message) {
|
||||||
try {
|
try {
|
||||||
IMessagePublication publication = createMessagePublication(message);
|
publishMessage(message);
|
||||||
publication.execute();
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
handlePublicationError(new PublicationError()
|
handlePublicationError(new PublicationError()
|
||||||
.setMessage("Error during publication of message")
|
.setMessage("Error during publication of message")
|
||||||
@ -51,8 +50,9 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T> implements IMes
|
|||||||
*
|
*
|
||||||
* @return A message publication that can be used to access information about it's state
|
* @return A message publication that can be used to access information about it's state
|
||||||
*/
|
*/
|
||||||
public IMessagePublication publishAsync(T message) {
|
@Override
|
||||||
return addAsynchronousPublication(createMessagePublication(message));
|
public void publishAsync(T message) {
|
||||||
|
addAsynchronousPublication(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -66,7 +66,8 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T> implements IMes
|
|||||||
*
|
*
|
||||||
* @return A message publication that wraps up the publication request
|
* @return A message publication that wraps up the publication request
|
||||||
*/
|
*/
|
||||||
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
|
@Override
|
||||||
return addAsynchronousPublication(createMessagePublication(message), timeout, unit);
|
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||||
|
addAsynchronousPublication(message, timeout, unit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,110 +0,0 @@
|
|||||||
package net.engio.mbassy.bus;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import net.engio.mbassy.bus.common.DeadMessage;
|
|
||||||
import net.engio.mbassy.bus.common.PubSubSupport;
|
|
||||||
import net.engio.mbassy.subscription.Subscription;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A message publication is created for each asynchronous message dispatch. It reflects the state
|
|
||||||
* of the corresponding message publication process, i.e. provides information whether the
|
|
||||||
* publication was successfully scheduled, is currently running etc.
|
|
||||||
* <p/>
|
|
||||||
* A message publication lives within a single thread. It is not designed in a thread-safe manner -> not eligible to
|
|
||||||
* be used in multiple threads simultaneously .
|
|
||||||
*
|
|
||||||
* @author bennidi
|
|
||||||
* Date: 11/16/12
|
|
||||||
*/
|
|
||||||
public class MessagePublication implements IMessagePublication {
|
|
||||||
|
|
||||||
private final Collection<Subscription> subscriptions;
|
|
||||||
private final Object message;
|
|
||||||
// message publications can be referenced by multiple threads to query publication progress
|
|
||||||
private volatile State state = State.Initial;
|
|
||||||
private volatile boolean delivered = false;
|
|
||||||
private PubSubSupport pubSub;
|
|
||||||
|
|
||||||
protected MessagePublication(PubSubSupport pubSub, Collection<Subscription> subscriptions, Object message, State initialState) {
|
|
||||||
this.pubSub = pubSub;
|
|
||||||
this.subscriptions = subscriptions;
|
|
||||||
this.message = message;
|
|
||||||
this.state = initialState;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean add(Subscription subscription) {
|
|
||||||
return this.subscriptions.add(subscription);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
TODO: document state transitions
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void execute() {
|
|
||||||
this.state = State.Running;
|
|
||||||
for (Subscription sub : this.subscriptions) {
|
|
||||||
sub.publish(this, this.message);
|
|
||||||
}
|
|
||||||
this.state = State.Finished;
|
|
||||||
// if the message has not been marked delivered by the dispatcher
|
|
||||||
if (!this.delivered) {
|
|
||||||
if (!isDeadEvent()) {
|
|
||||||
this.pubSub.publish(new DeadMessage(this.message));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isFinished() {
|
|
||||||
return this.state.equals(State.Finished);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRunning() {
|
|
||||||
return this.state.equals(State.Running);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isScheduled() {
|
|
||||||
return this.state.equals(State.Scheduled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void markDelivered() {
|
|
||||||
this.delivered = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MessagePublication markScheduled() {
|
|
||||||
if (this.state.equals(State.Initial)) {
|
|
||||||
this.state = State.Scheduled;
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDeadEvent() {
|
|
||||||
return DeadMessage.class.equals(this.message.getClass());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object getMessage() {
|
|
||||||
return this.message;
|
|
||||||
}
|
|
||||||
|
|
||||||
private enum State {
|
|
||||||
Initial, Scheduled, Running, Finished, Error
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory {
|
|
||||||
|
|
||||||
public IMessagePublication createPublication(PubSubSupport pubSub, Collection<Subscription> subscriptions, Object message) {
|
|
||||||
return new MessagePublication(pubSub, subscriptions, message, State.Initial);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -22,8 +22,7 @@ public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements IMess
|
|||||||
@Override
|
@Override
|
||||||
public void publish(T message) {
|
public void publish(T message) {
|
||||||
try {
|
try {
|
||||||
IMessagePublication publication = createMessagePublication(message);
|
publishMessage(message);
|
||||||
publication.execute();
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
handlePublicationError(new PublicationError()
|
handlePublicationError(new PublicationError()
|
||||||
.setMessage("Error during publication of message")
|
.setMessage("Error during publication of message")
|
||||||
@ -33,15 +32,13 @@ public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements IMess
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IMessagePublication publishAsync(T message) {
|
public void publishAsync(T message) {
|
||||||
publish(message);
|
publish(message);
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
|
public void publishAsync(T message, long timeout, TimeUnit unit) {
|
||||||
publish(message);
|
publish(message);
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2,8 +2,6 @@ package net.engio.mbassy.bus.common;
|
|||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface defines the very basic message publication semantics according to the publish subscribe pattern.
|
* This interface defines the very basic message publication semantics according to the publish subscribe pattern.
|
||||||
* Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its
|
* Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its
|
||||||
@ -50,10 +48,8 @@ public interface PubSubSupport<T> {
|
|||||||
* <p/>
|
* <p/>
|
||||||
* If an unbound queuing strategy is used the call returns immediately.
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||||
*
|
|
||||||
* @return A message publication that can be used to access information about it's state
|
|
||||||
*/
|
*/
|
||||||
IMessagePublication publishAsync(T message);
|
void publishAsync(T message);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||||
@ -62,8 +58,6 @@ public interface PubSubSupport<T> {
|
|||||||
* If an unbound queuing strategy is used the call returns immediately.
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
* If a bounded queue is used the call will block until the message can be placed in the queue
|
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||||
* or the timeout is reached.
|
* or the timeout is reached.
|
||||||
*
|
|
||||||
* @return A message publication that wraps up the publication request
|
|
||||||
*/
|
*/
|
||||||
IMessagePublication publishAsync(T message, long timeout, TimeUnit unit);
|
void publishAsync(T message, long timeout, TimeUnit unit);
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.engio.mbassy.bus.config;
|
package net.engio.mbassy.bus.config;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -9,8 +8,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
import net.engio.mbassy.bus.MessagePublication;
|
|
||||||
import net.engio.mbassy.listener.MetadataReader;
|
import net.engio.mbassy.listener.MetadataReader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,11 +23,9 @@ public interface Feature {
|
|||||||
|
|
||||||
public static final SyncPubSub Default(){
|
public static final SyncPubSub Default(){
|
||||||
return new SyncPubSub()
|
return new SyncPubSub()
|
||||||
.setMetadataReader(new MetadataReader())
|
.setMetadataReader(new MetadataReader());
|
||||||
.setPublicationFactory(new MessagePublication.Factory());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MessagePublication.Factory publicationFactory;
|
|
||||||
private MetadataReader metadataReader;
|
private MetadataReader metadataReader;
|
||||||
|
|
||||||
|
|
||||||
@ -42,20 +37,6 @@ public interface Feature {
|
|||||||
this.metadataReader = metadataReader;
|
this.metadataReader = metadataReader;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The message publication factory is used to wrap a published message
|
|
||||||
* in a {@link MessagePublication} for processing.
|
|
||||||
* @return The factory to be used by the bus to create the publications
|
|
||||||
*/
|
|
||||||
public MessagePublication.Factory getPublicationFactory() {
|
|
||||||
return publicationFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SyncPubSub setPublicationFactory(MessagePublication.Factory publicationFactory) {
|
|
||||||
this.publicationFactory = publicationFactory;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class AsynchronousHandlerInvocation implements Feature {
|
class AsynchronousHandlerInvocation implements Feature {
|
||||||
@ -114,13 +95,11 @@ public interface Feature {
|
|||||||
public static final AsynchronousMessageDispatch Default(){
|
public static final AsynchronousMessageDispatch Default(){
|
||||||
return new AsynchronousMessageDispatch()
|
return new AsynchronousMessageDispatch()
|
||||||
.setNumberOfMessageDispatchers(2)
|
.setNumberOfMessageDispatchers(2)
|
||||||
.setDispatcherThreadFactory(MessageDispatchThreadFactory)
|
.setDispatcherThreadFactory(MessageDispatchThreadFactory);
|
||||||
.setMessageQueue(new LinkedBlockingQueue<IMessagePublication>(Integer.MAX_VALUE));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private int numberOfMessageDispatchers;
|
private int numberOfMessageDispatchers;
|
||||||
private BlockingQueue<IMessagePublication> pendingMessages;
|
|
||||||
private ThreadFactory dispatcherThreadFactory;
|
private ThreadFactory dispatcherThreadFactory;
|
||||||
|
|
||||||
public int getNumberOfMessageDispatchers() {
|
public int getNumberOfMessageDispatchers() {
|
||||||
@ -132,14 +111,6 @@ public interface Feature {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockingQueue<IMessagePublication> getPendingMessages() {
|
|
||||||
return pendingMessages;
|
|
||||||
}
|
|
||||||
|
|
||||||
public AsynchronousMessageDispatch setMessageQueue(BlockingQueue<IMessagePublication> pendingMessages) {
|
|
||||||
this.pendingMessages = pendingMessages;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ThreadFactory getDispatcherThreadFactory() {
|
public ThreadFactory getDispatcherThreadFactory() {
|
||||||
return dispatcherThreadFactory;
|
return dispatcherThreadFactory;
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
package net.engio.mbassy.bus.error;
|
package net.engio.mbassy.bus.error;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,10 +46,10 @@ public class PublicationError{
|
|||||||
|
|
||||||
public PublicationError(final Throwable cause,
|
public PublicationError(final Throwable cause,
|
||||||
final String message,
|
final String message,
|
||||||
final IMessagePublication publication) {
|
final Object messageObject) {
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
this.message = message;
|
this.message = message;
|
||||||
this.publishedObject = publication != null ? publication.getMessage() : null;
|
this.publishedObject = messageObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -68,7 +66,7 @@ public class PublicationError{
|
|||||||
* @return The Throwable giving rise to this PublicationError.
|
* @return The Throwable giving rise to this PublicationError.
|
||||||
*/
|
*/
|
||||||
public Throwable getCause() {
|
public Throwable getCause() {
|
||||||
return cause;
|
return this.cause;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -83,7 +81,7 @@ public class PublicationError{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String getMessage() {
|
public String getMessage() {
|
||||||
return message;
|
return this.message;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PublicationError setMessage(String message) {
|
public PublicationError setMessage(String message) {
|
||||||
@ -92,7 +90,7 @@ public class PublicationError{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Method getHandler() {
|
public Method getHandler() {
|
||||||
return handler;
|
return this.handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PublicationError setHandler(Method handler) {
|
public PublicationError setHandler(Method handler) {
|
||||||
@ -101,7 +99,7 @@ public class PublicationError{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Object getListener() {
|
public Object getListener() {
|
||||||
return listener;
|
return this.listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PublicationError setListener(Object listener) {
|
public PublicationError setListener(Object listener) {
|
||||||
@ -110,7 +108,7 @@ public class PublicationError{
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Object getPublishedObject() {
|
public Object getPublishedObject() {
|
||||||
return publishedObject;
|
return this.publishedObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PublicationError setPublishedObject(Object publishedObject) {
|
public PublicationError setPublishedObject(Object publishedObject) {
|
||||||
@ -126,15 +124,15 @@ public class PublicationError{
|
|||||||
String newLine = System.getProperty("line.separator");
|
String newLine = System.getProperty("line.separator");
|
||||||
return "PublicationError{" +
|
return "PublicationError{" +
|
||||||
newLine +
|
newLine +
|
||||||
"\tcause=" + cause +
|
"\tcause=" + this.cause +
|
||||||
newLine +
|
newLine +
|
||||||
"\tmessage='" + message + '\'' +
|
"\tmessage='" + this.message + '\'' +
|
||||||
newLine +
|
newLine +
|
||||||
"\thandler=" + handler +
|
"\thandler=" + this.handler +
|
||||||
newLine +
|
newLine +
|
||||||
"\tlistener=" + listener +
|
"\tlistener=" + this.listener +
|
||||||
newLine +
|
newLine +
|
||||||
"\tpublishedObject=" + publishedObject +
|
"\tpublishedObject=" + this.publishedObject +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ import java.lang.reflect.InvocationTargetException;
|
|||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||||
import net.engio.mbassy.bus.error.PublicationError;
|
import net.engio.mbassy.bus.error.PublicationError;
|
||||||
import net.engio.mbassy.common.IConcurrentSet;
|
import net.engio.mbassy.common.IConcurrentSet;
|
||||||
@ -76,9 +75,11 @@ public class Subscription {
|
|||||||
return this.handlerMetadata.getHandledMessages();
|
return this.handlerMetadata.getHandledMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void publish(IMessagePublication publication, Object message){
|
/**
|
||||||
|
* @return TRUE if there were listeners/handlers available to publish to
|
||||||
|
*/
|
||||||
|
public boolean publishToSubscription(Object message){
|
||||||
if (this.listeners.size() > 0) {
|
if (this.listeners.size() > 0) {
|
||||||
publication.markDelivered();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delivers the given message to the given set of listeners.
|
* Delivers the given message to the given set of listeners.
|
||||||
@ -113,7 +114,11 @@ public class Subscription {
|
|||||||
handler, listener, message));
|
handler, listener, message));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3,8 +3,8 @@ package net.engio.mbassy;
|
|||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import net.engio.mbassy._misc.BusFactory;
|
||||||
import net.engio.mbassy.annotations.Handler;
|
import net.engio.mbassy.annotations.Handler;
|
||||||
import net.engio.mbassy.bus.BusFactory;
|
|
||||||
import net.engio.mbassy.bus.common.IMessageBus;
|
import net.engio.mbassy.bus.common.IMessageBus;
|
||||||
import net.engio.mbassy.common.MessageBusTest;
|
import net.engio.mbassy.common.MessageBusTest;
|
||||||
|
|
||||||
|
@ -3,8 +3,8 @@ package net.engio.mbassy;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import net.engio.mbassy.annotations.Handler;
|
import net.engio.mbassy.annotations.Handler;
|
||||||
|
import net.engio.mbassy.bus.DeadMessage;
|
||||||
import net.engio.mbassy.bus.MBassador;
|
import net.engio.mbassy.bus.MBassador;
|
||||||
import net.engio.mbassy.bus.common.DeadMessage;
|
|
||||||
import net.engio.mbassy.common.ConcurrentExecutor;
|
import net.engio.mbassy.common.ConcurrentExecutor;
|
||||||
import net.engio.mbassy.common.ListenerFactory;
|
import net.engio.mbassy.common.ListenerFactory;
|
||||||
import net.engio.mbassy.common.MessageBusTest;
|
import net.engio.mbassy.common.MessageBusTest;
|
||||||
|
@ -2,7 +2,7 @@ package net.engio.mbassy;
|
|||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import net.engio.mbassy.bus.BusFactory;
|
import net.engio.mbassy._misc.BusFactory;
|
||||||
import net.engio.mbassy.bus.MBassador;
|
import net.engio.mbassy.bus.MBassador;
|
||||||
import net.engio.mbassy.bus.common.IMessageBus;
|
import net.engio.mbassy.bus.common.IMessageBus;
|
||||||
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
|
||||||
|
@ -2,10 +2,10 @@ package net.engio.mbassy;
|
|||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import net.engio.mbassy.annotations.Handler;
|
import net.engio.mbassy.annotations.Handler;
|
||||||
import net.engio.mbassy.annotations.Synchronized;
|
import net.engio.mbassy.annotations.Synchronized;
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
import net.engio.mbassy.bus.common.IMessageBus;
|
import net.engio.mbassy.bus.common.IMessageBus;
|
||||||
import net.engio.mbassy.bus.config.Feature;
|
import net.engio.mbassy.bus.config.Feature;
|
||||||
import net.engio.mbassy.bus.config.IBusConfiguration;
|
import net.engio.mbassy.bus.config.IBusConfiguration;
|
||||||
@ -21,8 +21,7 @@ import org.junit.Test;
|
|||||||
*/
|
*/
|
||||||
public class SynchronizedHandlerTest extends MessageBusTest {
|
public class SynchronizedHandlerTest extends MessageBusTest {
|
||||||
|
|
||||||
|
private static AtomicInteger counter = new AtomicInteger(0);
|
||||||
private static int incrementsPerMessage = 10000;
|
|
||||||
private static int numberOfMessages = 1000;
|
private static int numberOfMessages = 1000;
|
||||||
private static int numberOfListeners = 1000;
|
private static int numberOfListeners = 1000;
|
||||||
|
|
||||||
@ -30,8 +29,8 @@ public class SynchronizedHandlerTest extends MessageBusTest {
|
|||||||
public void testSynchronizedWithSynchronousInvocation(){
|
public void testSynchronizedWithSynchronousInvocation(){
|
||||||
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
|
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
|
||||||
IBusConfiguration config = SyncAsync();
|
IBusConfiguration config = SyncAsync();
|
||||||
config.getFeature(Feature.AsynchronousMessageDispatch.class)
|
config.getFeature(Feature.AsynchronousMessageDispatch.class).setNumberOfMessageDispatchers(6);
|
||||||
.setNumberOfMessageDispatchers(6);
|
|
||||||
IMessageBus bus = createBus(config);
|
IMessageBus bus = createBus(config);
|
||||||
for(int i = 0; i < numberOfListeners; i++){
|
for(int i = 0; i < numberOfListeners; i++){
|
||||||
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
|
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
|
||||||
@ -39,31 +38,29 @@ public class SynchronizedHandlerTest extends MessageBusTest {
|
|||||||
bus.subscribe(handler);
|
bus.subscribe(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
IMessagePublication publication = null;
|
|
||||||
for(int i = 0; i < numberOfMessages; i++){
|
for(int i = 0; i < numberOfMessages; i++){
|
||||||
publication = bus.publishAsync(new Object());
|
bus.publishAsync(new Object());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int totalCount = numberOfListeners * numberOfMessages;
|
||||||
|
int expireCount = 1000;
|
||||||
|
|
||||||
// wait for last publication
|
// wait for last publication
|
||||||
while (!publication.isFinished()){
|
while (expireCount-- > 0 && counter.get() < totalCount){
|
||||||
pause(100);
|
pause(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(SynchronizedWithSynchronousDelivery handler : handlers){
|
if (expireCount <= 0) {
|
||||||
assertEquals(incrementsPerMessage * numberOfMessages, handler.counter);
|
fail("Count '" + counter.get() + "' was incorrect!");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SynchronizedWithSynchronousDelivery {
|
public static class SynchronizedWithSynchronousDelivery {
|
||||||
|
|
||||||
private int counter = 0;
|
|
||||||
|
|
||||||
@Handler
|
@Handler
|
||||||
@Synchronized
|
@Synchronized
|
||||||
public void handleMessage(Object o){
|
public void handleMessage(Object o){
|
||||||
for(int i = 0; i < incrementsPerMessage; i++){
|
counter.getAndIncrement();
|
||||||
this.counter++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
package net.engio.mbassy.common;
|
package net.engio.mbassy.common;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import net.engio.mbassy.bus.IMessagePublication;
|
|
||||||
import net.engio.mbassy.bus.MBassador;
|
import net.engio.mbassy.bus.MBassador;
|
||||||
import net.engio.mbassy.bus.config.BusConfiguration;
|
import net.engio.mbassy.bus.config.BusConfiguration;
|
||||||
import net.engio.mbassy.bus.config.Feature;
|
import net.engio.mbassy.bus.config.Feature;
|
||||||
@ -40,11 +37,9 @@ public abstract class MessageBusTest extends AssertSupport {
|
|||||||
};
|
};
|
||||||
|
|
||||||
private static final Object mapObject = new Object();
|
private static final Object mapObject = new Object();
|
||||||
private ConcurrentHashMap<IMessagePublication, Object> issuedPublications = new ConcurrentHashMap<IMessagePublication, Object>();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp(){
|
public void setUp(){
|
||||||
this.issuedPublications = new ConcurrentHashMap<IMessagePublication, Object>();
|
|
||||||
for(MessageTypes mes : MessageTypes.values()) {
|
for(MessageTypes mes : MessageTypes.values()) {
|
||||||
mes.reset();
|
mes.reset();
|
||||||
}
|
}
|
||||||
@ -69,27 +64,4 @@ public abstract class MessageBusTest extends AssertSupport {
|
|||||||
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
|
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
|
||||||
return bus;
|
return bus;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void track(IMessagePublication asynchronously) {
|
|
||||||
this.issuedPublications.put(asynchronously, mapObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForPublications(long timeOutInMs){
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
while(this.issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){
|
|
||||||
for(IMessagePublication pub : this.issuedPublications.keySet()){
|
|
||||||
if(pub.isFinished()) {
|
|
||||||
this.issuedPublications.remove(pub);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(this.issuedPublications.size() > 0) {
|
|
||||||
fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addPublication(IMessagePublication publication){
|
|
||||||
this.issuedPublications.put(publication, mapObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user