From 5612cbee517d2bd74afcab22fac2fe0f4f84f953 Mon Sep 17 00:00:00 2001 From: durron597 Date: Mon, 25 Nov 2013 16:33:12 -0600 Subject: [PATCH] Added the ability to plug in your own subscription manager. This way you can implement caching if you want to, or not if you don't want to. Existing applications should be unaffected. --- .../engio/mbassy/bus/AbstractSyncMessageBus.java | 3 ++- .../engio/mbassy/bus/config/BusConfiguration.java | 11 +++++++++++ .../mbassy/bus/config/ISyncBusConfiguration.java | 3 +++ .../mbassy/bus/config/SyncBusConfiguration.java | 13 +++++++++++++ .../subscription/ISubscriptionManagerProvider.java | 9 +++++++++ .../subscription/SubscriptionManagerProvider.java | 12 ++++++++++++ 6 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java create mode 100644 src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java index b59b2c8..164affc 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java @@ -35,7 +35,8 @@ public abstract class AbstractSyncMessageBus i public AbstractSyncMessageBus(ISyncBusConfiguration configuration) { this.runtime = new BusRuntime(this); this.runtime.add("error.handlers", getRegisteredErrorHandlers()); - this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(), + this.subscriptionManager = configuration.getSubscriptionManagerProvider() + .createManager(configuration.getMetadataReader(), configuration.getSubscriptionFactory(), runtime); this.publicationFactory = configuration.getMessagePublicationFactory(); } diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java index fdad230..2bda865 100644 --- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java @@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManagerProvider; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -95,6 +97,7 @@ public class BusConfiguration implements IBusConfiguration { defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory()); defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)); defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory); + defaultConfig.setSubscriptionManagerProvider(new SubscriptionManagerProvider()); return defaultConfig; } @@ -108,6 +111,7 @@ public class BusConfiguration implements IBusConfiguration { protected MetadataReader metadataReader; protected MessagePublication.Factory messagePublicationFactory; protected ThreadFactory dispatcherThreadFactory; + protected ISubscriptionManagerProvider subscriptionManagerProvider; public void setPendingMessagesQueue(BlockingQueue pendingMessagesQueue) { this.pendingMessagesQueue = pendingMessagesQueue; @@ -184,4 +188,11 @@ public class BusConfiguration implements IBusConfiguration { return this; } + public ISubscriptionManagerProvider getSubscriptionManagerProvider() { + return subscriptionManagerProvider; + } + + public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { + this.subscriptionManagerProvider = subscriptionManagerProvider; + } } diff --git a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java index a979830..4922cfc 100644 --- a/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/ISyncBusConfiguration.java @@ -2,6 +2,7 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; public interface ISyncBusConfiguration { @@ -11,5 +12,7 @@ public interface ISyncBusConfiguration { MetadataReader getMetadataReader(); SubscriptionFactory getSubscriptionFactory(); + + ISubscriptionManagerProvider getSubscriptionManagerProvider(); } \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java index d776540..9d1a6eb 100644 --- a/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/config/SyncBusConfiguration.java @@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config; import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.ISubscriptionManagerProvider; import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManagerProvider; /** * Todo: Add javadoc @@ -15,11 +17,13 @@ public class SyncBusConfiguration> implements protected SubscriptionFactory subscriptionFactory; protected MetadataReader metadataReader; protected MessagePublication.Factory messagePublicationFactory; + protected ISubscriptionManagerProvider subscriptionManagerProvider; public SyncBusConfiguration() { this.metadataReader = new MetadataReader(); this.subscriptionFactory = new SubscriptionFactory(); this.messagePublicationFactory = new MessagePublication.Factory(); + this.subscriptionManagerProvider = new SubscriptionManagerProvider(); } public MessagePublication.Factory getMessagePublicationFactory() { @@ -47,4 +51,13 @@ public class SyncBusConfiguration> implements this.subscriptionFactory = subscriptionFactory; return (C) this; } + + public ISubscriptionManagerProvider getSubscriptionManagerProvider() { + return subscriptionManagerProvider; + } + + public C setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) { + this.subscriptionManagerProvider = subscriptionManagerProvider; + return (C) this; + } } diff --git a/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java new file mode 100644 index 0000000..4094bab --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/ISubscriptionManagerProvider.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.listener.MetadataReader; + +public interface ISubscriptionManagerProvider { + SubscriptionManager createManager(MetadataReader reader, + SubscriptionFactory factory, BusRuntime runtime); +} diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java new file mode 100644 index 0000000..fd6f319 --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManagerProvider.java @@ -0,0 +1,12 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.bus.BusRuntime; +import net.engio.mbassy.listener.MetadataReader; + +public class SubscriptionManagerProvider implements ISubscriptionManagerProvider { + @Override + public SubscriptionManager createManager(MetadataReader reader, + SubscriptionFactory factory, BusRuntime runtime) { + return new SubscriptionManager(reader, factory, runtime); + } +}