first steps towards release 1.2.1

This commit is contained in:
Benjamin Diedrichsen 2014-11-02 09:54:53 +01:00
parent 7cd5a71e79
commit 1ab19f84c9
20 changed files with 129 additions and 108 deletions

View File

@ -127,7 +127,7 @@ Creation of message bus and registration of listeners:
// create as many instances as necessary
// bind it to any upper bound
MBassador<TestMessage> bus = new MBassador<TestMessage>(BusConfiguration.SyncAsync());
MBassador<TestMessage> bus = new MBassador<TestMessage>();
ListeningBean listener = new ListeningBean();
// the listener will be registered using a weak-reference if not configured otherwise with @Listener
bus.subscribe(listener);
@ -161,15 +161,19 @@ You can also download binary release and javadoc from the [maven central reposit
There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus works. Code samples can also be found in the various test cases. Please read about the terminology used in this project to avoid confusion and misunderstanding.
<h2>Release Notes</h2>
<h3>1.2.1</h3>
+ API-Changes:
+ Removed deprecated method BusConfiguration.SyncAsync() -> use MBassador default constructor instead
+ Deleted interface ISyncMessageBus since it was merely an aggregation of existing interfaces -> replace with GenericMessagePublicationSupport
<h3>1.2.0</h3>
+ Added support for conditional handlers using Java EL. Thanks to Bernd Rosstauscher
for the initial implementation.
+ BREAKING CHANGES in BusConfiguration
++ Complete redesign of configuration setup using Features instead of simple get/set parameters. This will allow
+ Complete redesign of configuration setup using Features instead of simple get/set parameters. This will allow
to flexibly combine features and still be able to exclude those not available in certain environments,for example, threading and reflection in GWT (this will be part of future releases)
++ Properties formerly located in BusConfiguration now moved to their respective Feature class
++ Removed all SyncXX related interfaces and config implementations. There is now only one `BusConfiguration`
+ Properties formerly located in BusConfiguration now moved to their respective Feature class
+ Removed all SyncXX related interfaces and config implementations. There is now only one `BusConfiguration`
with its corresponding interface which will be used for all types of message bus implementations

View File

@ -84,7 +84,8 @@
<dependency>
<groupId>javax.el</groupId>
<artifactId>el-api</artifactId>
<version>2.2</version>
<scope>provided</scope>
<version>2.2</version>
</dependency>
<dependency>
<groupId>de.odysseus.juel</groupId>

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
@ -20,7 +19,7 @@ public class BusFactory {
*
* @return
*/
public static ISyncMessageBus SynchronousOnly(){
public static SyncMessageBus SynchronousOnly(){
BusConfiguration syncPubSubCfg = new BusConfiguration();
syncPubSubCfg.addFeature(Feature.SyncPubSub.Default());
return new SyncMessageBus(syncPubSubCfg);

View File

@ -1,6 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
@ -14,6 +16,13 @@ public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCo
super(configuration);
}
public MBassador(){
super(new BusConfiguration()
.addFeature(Feature.SyncPubSub.Default())
.addFeature(Feature.AsynchronousHandlerInvocation.Default())
.addFeature(Feature.AsynchronousMessageDispatch.Default()));
}
public MessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));

View File

@ -1,6 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.common.ErrorHandlingSupport;
import net.engio.mbassy.bus.common.GenericMessagePublicationSupport;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.IPublicationCommand;
@ -10,7 +12,7 @@ import net.engio.mbassy.bus.publication.IPublicationCommand;
* will not create any new threads.
*
*/
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements ISyncMessageBus<T, SyncMessageBus.SyncPostCommand> {
public class SyncMessageBus<T> extends AbstractPubSubSupport<T> implements PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, SyncMessageBus.SyncPostCommand>{
public SyncMessageBus(IBusConfiguration configuration) {

View File

@ -9,7 +9,7 @@ import net.engio.mbassy.bus.publication.IPublicationCommand;
* The parametrization of the IPostCommand influences which publication flavours are available.
*
*/
public interface GenericMessagePublicationSupport<T, P extends IPublicationCommand> {
public interface GenericMessagePublicationSupport<T, P extends IPublicationCommand> extends PubSubSupport<T>, ErrorHandlingSupport{
/**
* Publish a message to the bus using on of its supported message publication mechanisms. The supported

View File

@ -58,7 +58,7 @@ import java.util.concurrent.Executor;
* Date: 2/8/12
*/
public interface IMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends PubSubSupport<T>, ErrorHandlingSupport, GenericMessagePublicationSupport<T, P>, ISyncMessageBus<T,P> {
extends GenericMessagePublicationSupport<T, P>{
/**
* {@inheritDoc}

View File

@ -8,24 +8,6 @@ import java.util.Map;
*/
public class BusConfiguration implements IBusConfiguration {
/**
* Creates a new instance, using the default settings of 2 dispatchers, and
* asynchronous handlers with an initial count equal to the number of
* available processors in the machine, with maximum count equal to
* 2 * the number of available processors. Uses {@link Runtime#availableProcessors()} to
* determine the number of available processors
*
* @deprecated Use feature driven configuration instead
**/
@Deprecated()
public static BusConfiguration SyncAsync() {
BusConfiguration defaultConfig = new BusConfiguration();
defaultConfig.addFeature(Feature.SyncPubSub.Default());
defaultConfig.addFeature(Feature.AsynchronousHandlerInvocation.Default());
defaultConfig.addFeature(Feature.AsynchronousMessageDispatch.Default());
return defaultConfig;
}
// the registered features
private Map<Class<? extends Feature>, Feature> features = new HashMap<Class<? extends Feature>, Feature>();
@ -46,6 +28,6 @@ public class BusConfiguration implements IBusConfiguration {
@Override
public IBusConfiguration addErrorHandler(ConfigurationErrorHandler handler) {
return null; //To change body of implemented methods use File | Settings | File Templates.
return null; // TODO: implement configuration validation
}
}

View File

@ -56,7 +56,14 @@ public class ReflectionUtils
return null;
}
public static Set<Class> getSuperclasses( Class from ) {
/**
* Collect all directly and indirectly related super types (classes and interfaces) of
* a given class.
*
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static Set<Class> getSuperTypes(Class from) {
Set<Class> superclasses = new HashSet<Class>();
collectInterfaces( from, superclasses );
while ( !from.equals( Object.class ) && !from.isInterface() ) {
@ -83,43 +90,41 @@ public class ReflectionUtils
return false;
}
public static <A extends Annotation> A getAnnotation( Method method, Class<A> annotationType ) {
return getAnnotation( (AnnotatedElement) method, annotationType );
}
public static <A extends Annotation> A getAnnotation( Class from, Class<A> annotationType ) {
return getAnnotation( (AnnotatedElement) from, annotationType );
}
/**
* Searches for an Annotation of the given type on the class. Supports meta annotations.
*
* @param from AnnotatedElement (class, method...)
* @param annotationType Annotation class to look for.
* @param <A> Annotation class
* @param <A> Class of annotation type
* @return Annotation instance or null
*/
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType ) {
A ann = from.getAnnotation( annotationType );
if ( ann == null ) {
for ( Annotation metaAnn : from.getAnnotations() ) {
ann = metaAnn.annotationType().getAnnotation( annotationType );
if ( ann != null ) {
break;
}
}
}
return ann;
private static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType, Set<AnnotatedElement> visited) {
if( visited.contains(from) ) return null;
visited.add(from);
A ann = from.getAnnotation( annotationType );
if( ann != null) return ann;
for ( Annotation metaAnn : from.getAnnotations() ) {
ann = getAnnotation(metaAnn.annotationType(), annotationType, visited);
if ( ann != null ) {
return ann;
}
}
return null;
}
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType){
return getAnnotation(from, annotationType, new HashSet<AnnotatedElement>());
}
private static boolean isOverriddenBy( Method superclassMethod, Method subclassMethod ) {
// if the declaring classes are the same or the subclass method is not defined in the subclass
// hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod
if ( superclassMethod.getDeclaringClass().equals(
subclassMethod.getDeclaringClass() ) || !superclassMethod.getDeclaringClass().isAssignableFrom(
subclassMethod.getDeclaringClass() ) || !superclassMethod.getName().equals(
subclassMethod.getName() ) ) {
if ( superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass() )
|| !superclassMethod.getDeclaringClass().isAssignableFrom( subclassMethod.getDeclaringClass() )
|| !superclassMethod.getName().equals(subclassMethod.getName())) {
return false;
}

View File

@ -4,6 +4,7 @@ import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.el.ElFilter;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
@ -49,7 +50,7 @@ public class MessageHandler {
if(filter == null){
filter = new IMessageFilter[]{};
}
net.engio.mbassy.listener.Enveloped enveloped = ReflectionUtils.getAnnotation( handler, Enveloped.class );
Enveloped enveloped = ReflectionUtils.getAnnotation( handler, Enveloped.class );
Class[] handledMessages = enveloped != null
? enveloped.messages()
: handler.getParameterTypes();
@ -96,7 +97,7 @@ public class MessageHandler {
private final IMessageFilter[] filter;
private String condition;
private final String condition;
private final int priority;
@ -153,6 +154,10 @@ public class MessageHandler {
}
public <A extends Annotation> A getAnnotation(Class<A> annotationType){
return ReflectionUtils.getAnnotation(handler,annotationType);
}
public boolean isSynchronized(){
return isSynchronized;
}

View File

@ -165,7 +165,7 @@ public class SubscriptionManager {
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
for (Class eventSuperType : ReflectionUtils.getSuperTypes(messageType)) {
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {

View File

@ -1,7 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Handler;
@ -94,7 +93,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testSimpleStringCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("TEST", 0);
@ -110,7 +109,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testSimpleNumberCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = new MBassador();
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 5);
@ -125,7 +124,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testHandleCombinedEL() throws Exception {
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 3);
@ -140,7 +139,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testNotMatchingAnyCondition() throws Exception {
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("", 0);
@ -154,7 +153,7 @@ public class ConditionalHandlers extends MessageBusTest {
************************************************************************/
@Test
public void testHandleMethodAccessEL() throws Exception {
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
bus.subscribe(new ConditionalMessageListener());
TestEvent message = new TestEvent("XYZ", 1);

View File

@ -1,9 +1,7 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.subscription.MessageEnvelope;
import org.junit.Test;
@ -35,6 +33,17 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
String[] value();
}
/**
* Handler annotation that adds a default filter on the NamedMessage.
* Enveloped is in no way required, but simply added to test a meta enveloped annotation.
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@NamedMessageHandler("messageThree")
static @interface MessageThree {}
/**
* Test enveloped meta annotation.
*/
@ -59,16 +68,12 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
{
@Override
public boolean accepts( NamedMessage message, MessageHandler metadata ) {
NamedMessageHandler namedMessageHandler =
ReflectionUtils.getAnnotation( metadata.getHandler(), NamedMessageHandler.class );
NamedMessageHandler namedMessageHandler = metadata.getAnnotation(NamedMessageHandler.class);
if ( namedMessageHandler != null ) {
return Arrays.asList( namedMessageHandler.value() ).contains( message.getName() );
}
EnvelopedNamedMessageHandler envelopedHandler =
ReflectionUtils.getAnnotation( metadata.getHandler(), EnvelopedNamedMessageHandler.class );
EnvelopedNamedMessageHandler envelopedHandler = metadata.getAnnotation(EnvelopedNamedMessageHandler.class);
return envelopedHandler != null && Arrays.asList( envelopedHandler.value() ).contains( message.getName() );
}
@ -103,7 +108,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
handledByTwo.add( (NamedMessage) envelope.getMessage() );
}
@NamedMessageHandler("messageThree")
@MessageThree
void handlerThree( NamedMessage message ) {
handledByThree.add( message );
}
@ -111,7 +116,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
@Test
public void testMetaHandlerFiltering() {
MBassador bus = getBus( BusConfiguration.SyncAsync() );
MBassador bus = createBus(SyncAsync());
NamedMessageListener listener = new NamedMessageListener();
bus.subscribe( listener );
@ -124,16 +129,15 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
bus.publish( messageTwo );
bus.publish( messageThree );
assertEquals(2, listener.handledByOne.size());
assertTrue( listener.handledByOne.contains( messageOne ) );
assertTrue( listener.handledByOne.contains( messageTwo ) );
assertFalse( listener.handledByOne.contains( messageThree ) );
assertTrue(listener.handledByOne.contains(messageTwo));
assertFalse( listener.handledByTwo.contains( messageOne ) );
assertEquals(2, listener.handledByTwo.size());
assertTrue( listener.handledByTwo.contains( messageTwo ) );
assertTrue( listener.handledByTwo.contains( messageThree ) );
assertFalse( listener.handledByThree.contains( messageOne ) );
assertFalse( listener.handledByThree.contains( messageTwo ) );
assertEquals(1, listener.handledByThree.size());
assertTrue( listener.handledByThree.contains( messageThree ) );
}
}

View File

@ -2,7 +2,6 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
@ -32,7 +31,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test
public void testDeadMessage(){
final MBassador bus = getBus(BusConfiguration.SyncAsync());
final MBassador bus = createBus(SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.AsyncListener.class)
@ -70,7 +69,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test
public void testUnsubscribingAllListeners() {
final MBassador bus = getBus(BusConfiguration.SyncAsync());
final MBassador bus = createBus(SyncAsync());
ListenerFactory deadMessageListener = new ListenerFactory()
.create(InstancesPerListener, DeadMessagHandler.class)
.create(InstancesPerListener, Object.class);

View File

@ -3,7 +3,6 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.common.ListenerFactory;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
@ -31,7 +30,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
@ -56,7 +55,7 @@ public class FilterTest extends MessageBusTest {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
@ -102,7 +101,7 @@ public class FilterTest extends MessageBusTest {
@Test
public void testSubtypesOnly(){
MBassador bus = getBus(BusConfiguration.SyncAsync());
MBassador bus = createBus(SyncAsync());
ListenerFactory listeners = new ListenerFactory()
.create(100, TestMessageHandler.class);

View File

@ -28,7 +28,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.synchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
final MBassador bus = createBus(SyncAsync(), listeners);
Runnable publishAndCheck = new Runnable() {
@ -62,7 +62,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
final MBassador bus = createBus(SyncAsync(), listeners);
final MessageManager messageManager = new MessageManager();
Runnable publishAndCheck = new Runnable() {
@ -94,7 +94,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.asynchronous())
.create(InstancesPerListener, Listeners.noHandlers());
final MBassador bus = getBus(BusConfiguration.SyncAsync(), listeners);
final MBassador bus = createBus(SyncAsync(), listeners);
final MessageManager messageManager = new MessageManager();
@ -132,7 +132,7 @@ public class MBassadorTest extends MessageBusTest {
}
};
final MBassador bus = new MBassador(BusConfiguration.SyncAsync());
final MBassador bus = new MBassador(SyncAsync());
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -41,7 +41,7 @@ public class MethodDispatchTest extends MessageBusTest{
@Test
public void testDispatch1(){
IMessageBus bus = getBus(BusConfiguration.SyncAsync());
IMessageBus bus = createBus(SyncAsync());
EventListener2 listener2 = new EventListener2();
bus.subscribe(listener2);
bus.post("jfndf").now();

View File

@ -2,8 +2,7 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.BusFactory;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.ISyncMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.common.GenericMessagePublicationSupport;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;
@ -32,12 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class SyncBusTest extends MessageBusTest {
protected abstract ISyncMessageBus getSyncMessageBus();
protected abstract GenericMessagePublicationSupport getSyncMessageBus();
@Test
public void testSynchronousMessagePublication() throws Exception {
final ISyncMessageBus bus = getSyncMessageBus();
final GenericMessagePublicationSupport bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -88,7 +87,7 @@ public abstract class SyncBusTest extends MessageBusTest {
}
};
final ISyncMessageBus bus = getSyncMessageBus();
final GenericMessagePublicationSupport bus = getSyncMessageBus();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
@ -114,7 +113,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Test
public void testCustomHandlerInvocation(){
final ISyncMessageBus bus = getSyncMessageBus();
final GenericMessagePublicationSupport bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, CustomInvocationListener.class)
.create(InstancesPerListener, Object.class);
@ -148,7 +147,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Test
public void testHandlerPriorities(){
final ISyncMessageBus bus = getSyncMessageBus();
final GenericMessagePublicationSupport bus = getSyncMessageBus();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, PrioritizedListener.class)
.create(InstancesPerListener, Object.class);
@ -177,8 +176,8 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
return new MBassador(BusConfiguration.SyncAsync());
protected GenericMessagePublicationSupport getSyncMessageBus() {
return new MBassador();
}
}
@ -187,7 +186,7 @@ public abstract class SyncBusTest extends MessageBusTest {
@Override
protected ISyncMessageBus getSyncMessageBus() {
protected GenericMessagePublicationSupport getSyncMessageBus() {
return BusFactory.SynchronousOnly();
}
}

View File

@ -2,7 +2,6 @@ package net.engio.mbassy;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.common.MessageBusTest;
@ -30,10 +29,10 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Test
public void testSynchronizedWithSynchronousInvocation(){
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
IBusConfiguration config = BusConfiguration.SyncAsync();
IBusConfiguration config = SyncAsync();
config.getFeature(Feature.AsynchronousMessageDispatch.class)
.setNumberOfMessageDispatchers(6);
IMessageBus bus = getBus(config);
IMessageBus bus = createBus(config);
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
handlers.add(handler);
@ -58,10 +57,10 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Test
public void testSynchronizedWithAsSynchronousInvocation(){
List<SynchronizedWithAsynchronousDelivery> handlers = new LinkedList<SynchronizedWithAsynchronousDelivery>();
IBusConfiguration config = BusConfiguration.SyncAsync();
IBusConfiguration config = SyncAsync();
config.getFeature(Feature.AsynchronousMessageDispatch.class)
.setNumberOfMessageDispatchers(6);
IMessageBus bus = getBus(config);
IMessageBus bus = createBus(config);
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithAsynchronousDelivery handler = new SynchronizedWithAsynchronousDelivery();
handlers.add(handler);
@ -69,10 +68,9 @@ public class SynchronizedHandlerTest extends MessageBusTest {
}
for(int i = 0; i < numberOfMessages; i++){
bus.post(new Object()).asynchronously();
track(bus.post(new Object()).asynchronously());
}
// TODO: wait for publication to finish
pause(10000);
for(SynchronizedWithAsynchronousDelivery handler : handlers){
@ -81,6 +79,8 @@ public class SynchronizedHandlerTest extends MessageBusTest {
}
public static class SynchronizedWithSynchronousDelivery {
private int counter = 0;

View File

@ -3,6 +3,8 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
@ -39,23 +41,35 @@ public abstract class MessageBusTest extends AssertSupport {
@Before
public void setUp(){
issuedPublications = new StrongConcurrentSet<MessagePublication>();
for(MessageTypes mes : MessageTypes.values())
mes.reset();
}
public MBassador getBus(IBusConfiguration configuration) {
public static IBusConfiguration SyncAsync() {
return new BusConfiguration()
.addFeature(Feature.SyncPubSub.Default())
.addFeature(Feature.AsynchronousHandlerInvocation.Default())
.addFeature(Feature.AsynchronousMessageDispatch.Default());
}
public MBassador createBus(IBusConfiguration configuration) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
return bus;
}
public MBassador getBus(IBusConfiguration configuration, ListenerFactory listeners) {
public MBassador createBus(IBusConfiguration configuration, ListenerFactory listeners) {
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
return bus;
}
protected void track(MessagePublication asynchronously) {
issuedPublications.add(asynchronously);
}
public void waitForPublications(long timeOutInMs){
long start = System.currentTimeMillis();
while(issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){