Merge remote-tracking branch 'origin/master'

This commit is contained in:
bennidi 2014-01-09 15:39:14 +01:00
commit fe696c2f95
6 changed files with 50 additions and 1 deletions

View File

@ -35,7 +35,8 @@ public abstract class AbstractSyncMessageBus<T, P extends IPublicationCommand> i
public AbstractSyncMessageBus(ISyncBusConfiguration configuration) {
this.runtime = new BusRuntime(this);
this.runtime.add("error.handlers", getRegisteredErrorHandlers());
this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(),
this.subscriptionManager = configuration.getSubscriptionManagerProvider()
.createManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory(), runtime);
this.publicationFactory = configuration.getMessagePublicationFactory();
}

View File

@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@ -95,6 +97,7 @@ public class BusConfiguration implements IBusConfiguration {
defaultConfig.setMessagePublicationFactory(new MessagePublication.Factory());
defaultConfig.setPendingMessagesQueue(new LinkedBlockingQueue<MessagePublication>(Integer.MAX_VALUE));
defaultConfig.setThreadFactoryForAsynchronousMessageDispatch(DispatcherThreadFactory);
defaultConfig.setSubscriptionManagerProvider(new SubscriptionManagerProvider());
return defaultConfig;
}
@ -108,6 +111,7 @@ public class BusConfiguration implements IBusConfiguration {
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
protected ThreadFactory dispatcherThreadFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public void setPendingMessagesQueue(BlockingQueue<MessagePublication> pendingMessagesQueue) {
this.pendingMessagesQueue = pendingMessagesQueue;
@ -184,4 +188,11 @@ public class BusConfiguration implements IBusConfiguration {
return this;
}
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public void setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
}
}

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
public interface ISyncBusConfiguration {
@ -11,5 +12,7 @@ public interface ISyncBusConfiguration {
MetadataReader getMetadataReader();
SubscriptionFactory getSubscriptionFactory();
ISubscriptionManagerProvider getSubscriptionManagerProvider();
}

View File

@ -2,7 +2,9 @@ package net.engio.mbassy.bus.config;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.ISubscriptionManagerProvider;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManagerProvider;
/**
* Todo: Add javadoc
@ -15,11 +17,13 @@ public class SyncBusConfiguration<C extends SyncBusConfiguration<C>> implements
protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
protected ISubscriptionManagerProvider subscriptionManagerProvider;
public SyncBusConfiguration() {
this.metadataReader = new MetadataReader();
this.subscriptionFactory = new SubscriptionFactory();
this.messagePublicationFactory = new MessagePublication.Factory();
this.subscriptionManagerProvider = new SubscriptionManagerProvider();
}
public MessagePublication.Factory getMessagePublicationFactory() {
@ -47,4 +51,13 @@ public class SyncBusConfiguration<C extends SyncBusConfiguration<C>> implements
this.subscriptionFactory = subscriptionFactory;
return (C) this;
}
public ISubscriptionManagerProvider getSubscriptionManagerProvider() {
return subscriptionManagerProvider;
}
public C setSubscriptionManagerProvider(ISubscriptionManagerProvider subscriptionManagerProvider) {
this.subscriptionManagerProvider = subscriptionManagerProvider;
return (C) this;
}
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public interface ISubscriptionManagerProvider {
SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime);
}

View File

@ -0,0 +1,12 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.BusRuntime;
import net.engio.mbassy.listener.MetadataReader;
public class SubscriptionManagerProvider implements ISubscriptionManagerProvider {
@Override
public SubscriptionManager createManager(MetadataReader reader,
SubscriptionFactory factory, BusRuntime runtime) {
return new SubscriptionManager(reader, factory, runtime);
}
}