Added the ability to plug in your own subscription manager. This way you

can implement caching if you want to, or not if you don't want to.
Existing applications should be unaffected.
This commit is contained in:
durron597 2013-11-25 16:33:12 -06:00
parent eb555ca1af
commit 5612cbee51
6 changed files with 50 additions and 1 deletions

View File

@ -35,7 +35,8 @@ public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> i
public AbstractSyncMessageBus(ISyncBusConfiguration configuration) { public AbstractSyncMessageBus(ISyncBusConfiguration configuration) {
this.runtime = new BusRuntime(this); this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers()); this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(), this.subscriptionManager = configuration.getSubscriptionManagerProvider()
.createManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory(), runtime); configuration.getSubscriptionFactory(), runtime);
this.publicationFactory = configuration.getMessagePublicationFactory(); this.publicationFactory = configuration.getMessagePublicationFactory();
} }

View File

@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -95,6 +97,7 @@ public class BusConfiguration implements IBusConfiguration {
defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory()); defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory());
defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE)); defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory); defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory);
defaultConfig.setSubscriptionManagerProvider(new SubscriptionManagerProvider());
return defaultConfig; return defaultConfig;
} }
@ -108,6 +111,7 @@ public class BusConfiguration implements IBusConfiguration {
protected MetadataReader metadataReader; protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory; protected MessagePublication.Factory messagePublicationFactory;
protected ThreadFactory dispatcherThreadFactory; protected ThreadFactory dispatcherThreadFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public void setPendingMessagesQueue(BlockingQueue<MessagePublication> pendingMessagesQueue) { public void setPendingMessagesQueue(BlockingQueue<MessagePublication> pendingMessagesQueue) {
this.pendingMessagesQueue = pendingMessagesQueue; this.pendingMessagesQueue = pendingMessagesQueue;
@ -184,4 +188,11 @@ public class BusConfiguration implements IBusConfiguration {
return this; return this;
} }
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
}
} }

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionFactory;
public interface ISyncBusConfiguration { public interface ISyncBusConfiguration {
@ -11,5 +12,7 @@ public interface ISyncBusConfiguration {
MetadataReader getMetadataReader(); MetadataReader getMetadataReader();
SubscriptionFactory getSubscriptionFactory(); SubscriptionFactory getSubscriptionFactory();
ISubscriptionManagerProvider getSubscriptionManagerProvider();
} }

View File

@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
/** /**
* Todo: Add javadoc * Todo: Add javadoc
@ -15,11 +17,13 @@ public class SyncBusConfiguration<C extends SyncBusConfiguration<C>> implements
protected SubscriptionFactory subscriptionFactory; protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader; protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory; protected MessagePublication.Factory messagePublicationFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public SyncBusConfiguration() { public SyncBusConfiguration() {
this.metadataReader = new MetadataReader(); this.metadataReader = new MetadataReader();
this.subscriptionFactory = new SubscriptionFactory(); this.subscriptionFactory = new SubscriptionFactory();
this.messagePublicationFactory = new MessagePublication.Factory(); this.messagePublicationFactory = new MessagePublication.Factory();
this.subscriptionManagerProvider = new SubscriptionManagerProvider();
} }
public MessagePublication.Factory getMessagePublicationFactory() { public MessagePublication.Factory getMessagePublicationFactory() {
@ -47,4 +51,13 @@ public class SyncBusConfiguration<C extends SyncBusConfiguration<C>> implements
this.subscriptionFactory = subscriptionFactory; this.subscriptionFactory = subscriptionFactory;
return (C) this; return (C) this;
} }
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public C setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return (C) this;
}
} }

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public interface ISubscriptionManagerProvider {
SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime);
}

View File

@ -0,0 +1,12 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public class SubscriptionManagerProvider implements ISubscriptionManagerProvider {
@Override
public SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime) {
return new SubscriptionManager(reader, factory, runtime);
}
}