- * @param
*/
-public abstract class AbstractSyncMessageBus implements ISyncMessageBus{
+public abstract class AbstractPubSubSupport implements PubSubSupport{
// this handler will receive all errors that occur during message dispatch or message handling
@@ -32,7 +30,7 @@ public abstract class AbstractSyncMessageBus i
private final BusRuntime runtime;
- public AbstractSyncMessageBus(ISyncBusConfiguration configuration) {
+ public AbstractPubSubSupport(ISyncBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.subscriptionManager = configuration.getSubscriptionManagerProvider()
@@ -45,7 +43,7 @@ public abstract class AbstractSyncMessageBus i
return publicationFactory;
}
- @Override
+
public Collection getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers);
}
diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
index d4187d1..d6b7e27 100644
--- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
+++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
@@ -17,7 +17,8 @@ import java.util.concurrent.TimeUnit;
* @param
* @param
*/
-public abstract class AbstractSyncAsyncMessageBus extends AbstractSyncMessageBus implements IMessageBus {
+public abstract class AbstractSyncAsyncMessageBus
+ extends AbstractPubSubSupport implements IMessageBus {
// executor for asynchronous message handlers
private final ExecutorService executor;
@@ -47,13 +48,15 @@ public abstract class AbstractSyncAsyncMessageBus extends PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport {
+public interface IMessageBus
+ extends PubSubSupport, ErrorHandlingSupport, GenericMessagePublicationSupport, ISyncMessageBus {
/**
* {@inheritDoc}
diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java
index e99f937..ee4206f 100644
--- a/src/main/java/net/engio/mbassy/bus/MBassador.java
+++ b/src/main/java/net/engio/mbassy/bus/MBassador.java
@@ -16,12 +16,12 @@ public class MBassador extends AbstractSyncAsyncMessageBus subscriptions, Object message, State initialState) {
@@ -86,6 +86,10 @@ public class MessagePublication {
return FilteredMessage.class.isAssignableFrom(message.getClass());
}
+ public Object getMessage() {
+ return message;
+ }
+
private enum State {
Initial, Scheduled, Running, Finished, Error
}
diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
index 72c45f4..9dfb911 100644
--- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
+++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
@@ -5,25 +5,18 @@ import net.engio.mbassy.bus.config.ISyncBusConfiguration;
import net.engio.mbassy.bus.publication.IPublicationCommand;
/**
- * Created with IntelliJ IDEA.
- * User: benjamin
- * Date: 4/3/13
- * Time: 9:02 AM
- * To change this template use File | Settings | File Templates.
+ * A message bus implementation that offers only synchronous message publication. Using this bus
+ * will not create any new threads.
+ *
*/
-public class SyncMessageBus extends AbstractSyncMessageBus{
+public class SyncMessageBus extends AbstractPubSubSupport implements ISyncMessageBus{
public SyncMessageBus(ISyncBusConfiguration configuration) {
super(configuration);
}
- /**
- * Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
- * The call blocks until every messageHandler has processed the message.
- *
- * @param message
- */
+ @Override
public void publish(T message) {
try {
MessagePublication publication = createMessagePublication(message);
@@ -34,7 +27,6 @@ public class SyncMessageBus extends AbstractSyncMessageBus extends AbstractSyncMessageBus handledByListener = new HashMap();
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void resetAll(){
for(MessageTypes m : values())
m.reset();
}
- private Map handledByListener = new HashMap();
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
@Override
public void reset() {