diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java index b59b2c8..164affc 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java @@ -35,7 +35,8 @@ public abstract class AbstractSyncMessageBus i public AbstractSyncMessageBus(ISyncBusConfiguration configuration) { this.runtime = new BusRuntime(this); this.runtime.add("error.handlers", getRegisteredErrorHandlers()); - this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(), + this.subscriptionManager = configuration.getSubscriptionManagerProvider() + .createManager(configuration.getMetadataReader(), configuration.getSubscriptionFactory(), runtime); this.publicationFactory = configuration.getMessagePublicationFactory(); } diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java index fdad230..2bda865 100644 --- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java @@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManagerProvider; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -95,6 +97,7 @@ public class BusConfiguration implements IBusConfiguration { defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory()); defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)); defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory); + defaultConfig.setSubscriptionManagerProvider(new SubscriptionManagerProvider()); return defaultConfig; } @@ -108,6 +111,7 @@ public class BusConfiguration implements IBusConfiguration { protected MetadataReader metadataReader; protected MessagePublication.Factory messagePublicationFactory; protected ThreadFactory dispatcherThreadFactory; + protected ISubscriptionManagerProvider subscriptionManagerProvider; public void setPendingMessagesQueue(BlockingQueue pendingMessagesQueue) { this.pendingMessagesQueue = pendingMessagesQueue; @@ -184,4 +188,11 @@ public class BusConfiguration implements IBusConfiguration { return this; } + public ISubscriptionManagerProvider getSubscriptionManagerProvider() { + return subscriptionManagerProvider; + } + + public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { + this.subscriptionManagerProvider = subscriptionManagerProvider; + } } diff --git a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java index a979830..4922cfc 100644 --- a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java @@ -2,6 +2,7 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; public interface ISyncBusConfiguration { @@ -11,5 +12,7 @@ public interface ISyncBusConfiguration { MetadataReader getMetadataReader(); SubscriptionFactory getSubscriptionFactory(); + + ISubscriptionManagerProvider getSubscriptionManagerProvider(); } \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java index d776540..9d1a6eb 100644 --- a/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java @@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManagerProvider; /** * Todo: Add javadoc @@ -15,11 +17,13 @@ public class SyncBusConfiguration> implements protected SubscriptionFactory subscriptionFactory; protected MetadataReader metadataReader; protected MessagePublication.Factory messagePublicationFactory; + protected ISubscriptionManagerProvider subscriptionManagerProvider; public SyncBusConfiguration() { this.metadataReader = new MetadataReader(); this.subscriptionFactory = new SubscriptionFactory(); this.messagePublicationFactory = new MessagePublication.Factory(); + this.subscriptionManagerProvider = new SubscriptionManagerProvider(); } public MessagePublication.Factory getMessagePublicationFactory() { @@ -47,4 +51,13 @@ public class SyncBusConfiguration> implements this.subscriptionFactory = subscriptionFactory; return (C) this; } + + public ISubscriptionManagerProvider getSubscriptionManagerProvider() { + return subscriptionManagerProvider; + } + + public C setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { + this.subscriptionManagerProvider = subscriptionManagerProvider; + return (C) this; + } } diff --git a/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java new file mode 100644 index 0000000..4094bab --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.listener.MetadataReader; + +public interface ISubscriptionManagerProvider { + SubscriptionManager createManager(MetadataReader reader, + SubscriptionFactory factory, BusRuntime runtime); +} diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java new file mode 100644 index 0000000..fd6f319 --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java @@ -0,0 +1,12 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.listener.MetadataReader; + +public class SubscriptionManagerProvider implements ISubscriptionManagerProvider { + @Override + public SubscriptionManager createManager(MetadataReader reader, + SubscriptionFactory factory, BusRuntime runtime) { + return new SubscriptionManager(reader, factory, runtime); + } +}