Renamed to MessageBus. Code cleanup

This commit is contained in:
nathan 2015-06-08 00:29:17 +02:00
parent 31e9fe84b3
commit fa0319cdc3
16 changed files with 102 additions and 103 deletions

View File

@ -3,6 +3,7 @@ package dorkbox.util.messagebus.common;
import com.esotericsoftware.reflectasm.MethodAccess; import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized; import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.utils.ReflectionUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -14,7 +14,7 @@ public class NamedThreadFactory implements ThreadFactory {
/** /**
* The stack size is arbitrary based on JVM implementation. Default is 0 * The stack size is arbitrary based on JVM implementation. Default is 0
* 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k * 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
*<p> * <p>
* To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit) * To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
* Setting the size MAY or MAY NOT have any effect!!! * Setting the size MAY or MAY NOT have any effect!!!
* <p> * <p>
@ -42,12 +42,14 @@ public class NamedThreadFactory implements ThreadFactory {
if (stackSize != null) { if (stackSize != null) {
int value = 0; int value = 0;
if (stackSize.endsWith("k")) { if (stackSize.endsWith("k")) {
stackSize = stackSize.substring(4, stackSize.length()-1); stackSize = stackSize.substring(4, stackSize.length() - 1);
value = Integer.parseInt(stackSize) * 1024; value = Integer.parseInt(stackSize) * 1024;
} else if (stackSize.endsWith("m")) { }
stackSize = stackSize.substring(4, stackSize.length()-1); else if (stackSize.endsWith("m")) {
stackSize = stackSize.substring(4, stackSize.length() - 1);
value = Integer.parseInt(stackSize) * 1024 * 1024; value = Integer.parseInt(stackSize) * 1024 * 1024;
} else { }
else {
try { try {
value = Integer.parseInt(stackSize.substring(4)); value = Integer.parseInt(stackSize.substring(4));
} catch (Exception e) { } catch (Exception e) {
@ -55,7 +57,8 @@ public class NamedThreadFactory implements ThreadFactory {
} }
stackSizeForThreads = value; stackSizeForThreads = value;
} else { }
else {
stackSizeForThreads = 8192; stackSizeForThreads = 8192;
} }
} }
@ -81,7 +84,7 @@ public class NamedThreadFactory implements ThreadFactory {
} }
public Thread newThread(String name, Runnable r) { public Thread newThread(String name, Runnable r) {
// stack size is arbitrary based on JVM implementation. Default is 0 // stack size is arbitrary based on JVM implementation. Default is 0
// 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k // 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
// To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit) // To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
// Setting the size MAY or MAY NOT have any effect!!! // Setting the size MAY or MAY NOT have any effect!!!

View File

@ -1,6 +1,12 @@
package dorkbox.util.messagebus.subscription; package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.*; import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -44,21 +44,20 @@ public class ReflectionUtils {
} }
/** /**
* Traverses the class hierarchy upwards, starting at the given subclass, looking * Traverses the class hierarchy upwards, starting at the given subclass, looking
* for an override of the given methods -> finds the bottom most override of the given * for an override of the given methods -> finds the bottom most override of the given
* method if any exists * method if any exists
* *
* @param overridingMethod * @param overridingMethod
* @param subclass * @param subclass
* @return * @return
*/ */
public static Method getOverridingMethod( final Method overridingMethod, final Class<?> subclass ) { public static Method getOverridingMethod(final Method overridingMethod, final Class<?> subclass) {
Class<?> current = subclass; Class<?> current = subclass;
while ( !current.equals( overridingMethod.getDeclaringClass() ) ) { while (!current.equals(overridingMethod.getDeclaringClass())) {
try { try {
return current.getDeclaredMethod( overridingMethod.getName(), overridingMethod.getParameterTypes() ); return current.getDeclaredMethod(overridingMethod.getName(), overridingMethod.getParameterTypes());
} } catch (NoSuchMethodException e) {
catch ( NoSuchMethodException e ) {
current = current.getSuperclass(); current = current.getSuperclass();
} }
} }
@ -75,12 +74,12 @@ public class ReflectionUtils {
public static Class<?>[] getSuperTypes(Class<?> from) { public static Class<?>[] getSuperTypes(Class<?> from) {
ArrayList<Class<?>> superclasses = new ArrayList<Class<?>>(); ArrayList<Class<?>> superclasses = new ArrayList<Class<?>>();
collectInterfaces( from, superclasses ); collectInterfaces(from, superclasses);
while ( !from.equals( Object.class ) && !from.isInterface() ) { while (!from.equals(Object.class) && !from.isInterface()) {
superclasses.add( from.getSuperclass() ); superclasses.add(from.getSuperclass());
from = from.getSuperclass(); from = from.getSuperclass();
collectInterfaces( from, superclasses ); collectInterfaces(from, superclasses);
} }
final Class<?>[] classes = new Class<?>[superclasses.size()]; final Class<?>[] classes = new Class<?>[superclasses.size()];
@ -88,10 +87,10 @@ public class ReflectionUtils {
return classes; return classes;
} }
public static void collectInterfaces( Class<?> from, Collection<Class<?>> accumulator ) { public static void collectInterfaces(Class<?> from, Collection<Class<?>> accumulator) {
for ( Class<?> intface : from.getInterfaces() ) { for (Class<?> intface : from.getInterfaces()) {
accumulator.add( intface ); accumulator.add(intface);
collectInterfaces( intface, accumulator ); collectInterfaces(intface, accumulator);
} }
} }
@ -99,7 +98,7 @@ public class ReflectionUtils {
final int length = allMethods.length; final int length = allMethods.length;
Method method; Method method;
for (int i=0;i<length;i++) { for (int i = 0; i < length; i++) {
method = allMethods[i]; method = allMethods[i];
if (isOverriddenBy(methodToCheck, method)) { if (isOverriddenBy(methodToCheck, method)) {
@ -112,44 +111,45 @@ public class ReflectionUtils {
/** /**
* Searches for an Annotation of the given type on the class. Supports meta annotations. * Searches for an Annotation of the given type on the class. Supports meta annotations.
* *
* @param from AnnotatedElement (class, method...) * @param from AnnotatedElement (class, method...)
* @param annotationType Annotation class to look for. * @param annotationType Annotation class to look for.
* @param <A> Class of annotation type * @param <A> Class of annotation type
* @return Annotation instance or null * @return Annotation instance or null
*/ */
private static <A extends Annotation> A getAnnotation(AnnotatedElement from, Class<A> annotationType, Collection<AnnotatedElement> visited) { private static <A extends Annotation> A getAnnotation(AnnotatedElement from, Class<A> annotationType,
if( visited.contains(from) ) { Collection<AnnotatedElement> visited) {
if (visited.contains(from)) {
return null; return null;
} }
visited.add(from); visited.add(from);
A ann = from.getAnnotation( annotationType ); A ann = from.getAnnotation(annotationType);
if( ann != null) { if (ann != null) {
return ann; return ann;
} }
for ( Annotation metaAnn : from.getAnnotations() ) { for (Annotation metaAnn : from.getAnnotations()) {
ann = getAnnotation(metaAnn.annotationType(), annotationType, visited); ann = getAnnotation(metaAnn.annotationType(), annotationType, visited);
if ( ann != null ) { if (ann != null) {
return ann; return ann;
} }
} }
return null; return null;
} }
public static <A extends Annotation> A getAnnotation( AnnotatedElement from, Class<A> annotationType) { public static <A extends Annotation> A getAnnotation(AnnotatedElement from, Class<A> annotationType) {
A annotation = getAnnotation(from, annotationType, new ConcurrentSet<AnnotatedElement>(16, .8F, 1)); A annotation = getAnnotation(from, annotationType, new ConcurrentSet<AnnotatedElement>(16, .8F, 1));
return annotation; return annotation;
} }
// //
private static boolean isOverriddenBy(final Method superclassMethod, final Method subclassMethod ) { private static boolean isOverriddenBy(final Method superclassMethod, final Method subclassMethod) {
// if the declaring classes are the same or the subclass method is not defined in the subclass // if the declaring classes are the same or the subclass method is not defined in the subclass
// hierarchy of the given superclass method or the method names are not the same then // hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod // subclassMethod does not override superclassMethod
if ( superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass() ) if (superclassMethod.getDeclaringClass().equals(subclassMethod.getDeclaringClass()) ||
|| !superclassMethod.getDeclaringClass().isAssignableFrom( subclassMethod.getDeclaringClass() ) !superclassMethod.getDeclaringClass().isAssignableFrom(subclassMethod.getDeclaringClass()) ||
|| !superclassMethod.getName().equals(subclassMethod.getName())) { !superclassMethod.getName().equals(subclassMethod.getName())) {
return false; return false;
} }
@ -158,8 +158,8 @@ public class ReflectionUtils {
// method must specify the same number of parameters // method must specify the same number of parameters
//the parameters must occur in the exact same order //the parameters must occur in the exact same order
for ( int i = 0; i < subClassMethodParameters.length; i++ ) { for (int i = 0; i < subClassMethodParameters.length; i++) {
if ( !superClassMethodParameters[i].equals( subClassMethodParameters[i] ) ) { if (!superClassMethodParameters[i].equals(subClassMethodParameters[i])) {
return false; return false;
} }
} }

View File

@ -5,7 +5,6 @@ import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.thread.ClassHolder; import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.utils.ClassUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;

View File

@ -80,7 +80,6 @@ public final class VarArgUtils {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions // and then, returns the array'd version superclass subscriptions

View File

@ -17,7 +17,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test @Test
public void testSingleThreadedSyncFIFO(){ public void testSingleThreadedSyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners // create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MultiMBassador(); IMessageBus fifoBUs = new MessageBus();
fifoBUs.start(); fifoBUs.start();
List<Listener> listeners = new LinkedList<Listener>(); List<Listener> listeners = new LinkedList<Listener>();
@ -53,7 +53,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
@Test @Test
public void testSingleThreadedSyncAsyncFIFO(){ public void testSingleThreadedSyncAsyncFIFO(){
// create a fifo bus with 1000 concurrently subscribed listeners // create a fifo bus with 1000 concurrently subscribed listeners
IMessageBus fifoBUs = new MultiMBassador(1); IMessageBus fifoBUs = new MessageBus(1);
List<Listener> listeners = new LinkedList<Listener>(); List<Listener> listeners = new LinkedList<Listener>();
for(int i = 0; i < 1000 ; i++){ for(int i = 0; i < 1000 ; i++){

View File

@ -1,19 +1,13 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized; import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.MessageBusTest; import dorkbox.util.messagebus.common.MessageBusTest;
import org.junit.Test;
import java.lang.annotation.*;
import java.util.HashSet;
import java.util.Set;
/** /**
* Tests a custom handler annotation with a @Handler meta annotation and a default filter. * Tests a custom handler annotation with a @Handler meta annotation and a default filter.
@ -95,7 +89,7 @@ public class CustomHandlerAnnotationTest extends MessageBusTest
@Test @Test
public void testMetaHandlerFiltering() { public void testMetaHandlerFiltering() {
MultiMBassador bus = createBus(); MessageBus bus = createBus();
NamedMessageListener listener = new NamedMessageListener(); NamedMessageListener listener = new NamedMessageListener();
bus.subscribe( listener ); bus.subscribe( listener );

View File

@ -1,20 +1,14 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.listeners.IMessageListener; import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener; import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.ObjectListener; import dorkbox.util.messagebus.listeners.ObjectListener;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Verify correct behaviour in case of message publications that do not have any matching subscriptions * Verify correct behaviour in case of message publications that do not have any matching subscriptions
@ -35,7 +29,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test @Test
public void testDeadMessage(){ public void testDeadMessage(){
final MultiMBassador bus = createBus(); final MessageBus bus = createBus();
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -71,7 +65,7 @@ public class DeadMessageTest extends MessageBusTest{
@Test @Test
public void testUnsubscribingAllListeners() { public void testUnsubscribingAllListeners() {
final MultiMBassador bus = createBus(); final MessageBus bus = createBus();
ListenerFactory deadMessageListener = new ListenerFactory() ListenerFactory deadMessageListener = new ListenerFactory()
.create(InstancesPerListener, DeadMessagHandler.class); .create(InstancesPerListener, DeadMessagHandler.class);

View File

@ -29,7 +29,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.synchronous()) .create(InstancesPerListener, Listeners.synchronous())
.create(InstancesPerListener, Listeners.noHandlers()); .create(InstancesPerListener, Listeners.noHandlers());
final MultiMBassador bus = createBus(listeners); final MessageBus bus = createBus(listeners);
Runnable publishAndCheck = new Runnable() { Runnable publishAndCheck = new Runnable() {
@ -63,7 +63,7 @@ public class MBassadorTest extends MessageBusTest {
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, Listeners.noHandlers()); .create(InstancesPerListener, Listeners.noHandlers());
final MultiMBassador bus = createBus(listeners); final MessageBus bus = createBus(listeners);
final MessageManager messageManager = new MessageManager(); final MessageManager messageManager = new MessageManager();
@ -100,7 +100,7 @@ public class MBassadorTest extends MessageBusTest {
} }
}; };
final MultiMBassador bus = new MultiMBassador(); final MessageBus bus = new MessageBus();
bus.addErrorHandler(ExceptionCounter); bus.addErrorHandler(ExceptionCounter);
bus.start(); bus.start();
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()

View File

@ -19,7 +19,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test @Test
public void testMultiMessageSending(){ public void testMultiMessageSending(){
IMessageBus bus = new MultiMBassador(); IMessageBus bus = new MessageBus();
bus.start(); bus.start();
Listener listener1 = new Listener(); Listener listener1 = new Listener();

View File

@ -15,7 +15,7 @@ public class PerfTest_MBassador {
final int warmupRuns = 4; final int warmupRuns = 4;
final int runs = 5; final int runs = 5;
MultiMBassador bus = new MultiMBassador(2); MessageBus bus = new MessageBus(2);
Listener listener1 = new Listener(); Listener listener1 = new Listener();
bus.subscribe(listener1); bus.subscribe(listener1);
@ -24,7 +24,8 @@ public class PerfTest_MBassador {
System.out.format("summary,PublishPerfTest, %,d\n", average); System.out.format("summary,PublishPerfTest, %,d\n", average);
} }
public static long averageRun(int warmUpRuns, int sumCount, MultiMBassador bus, boolean showStats, int concurrency, int repetitions) throws Exception { public static long averageRun(int warmUpRuns, int sumCount, MessageBus bus, boolean showStats, int concurrency, int repetitions)
throws Exception {
int runs = warmUpRuns + sumCount; int runs = warmUpRuns + sumCount;
final long[] results = new long[runs]; final long[] results = new long[runs];
for (int i = 0; i < runs; i++) { for (int i = 0; i < runs; i++) {
@ -40,7 +41,8 @@ public class PerfTest_MBassador {
return sum/sumCount; return sum/sumCount;
} }
private static long performanceRun(int runNumber, MultiMBassador bus, boolean showStats, int concurrency, int repetitions) throws Exception { private static long performanceRun(int runNumber, MessageBus bus, boolean showStats, int concurrency, int repetitions)
throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Thread[] threads = new Thread[concurrency*2]; Thread[] threads = new Thread[concurrency*2];
@ -82,19 +84,19 @@ public class PerfTest_MBassador {
} }
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final MultiMBassador bus; private final MessageBus bus;
volatile long start; volatile long start;
volatile long end; volatile long end;
private int repetitions; private int repetitions;
public Producer(MultiMBassador bus, int repetitions) { public Producer(MessageBus bus, int repetitions) {
this.bus = bus; this.bus = bus;
this.repetitions = repetitions; this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
MultiMBassador bus = this.bus; MessageBus bus = this.bus;
int i = this.repetitions; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();

View File

@ -3,11 +3,11 @@
*/ */
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import junit.framework.Assert;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import junit.framework.Assert;
/** /**
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
@ -29,7 +29,7 @@ public class PerformanceTest {
}; };
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final MultiMBassador bus = new MultiMBassador(CONCURRENCY_LEVEL); final MessageBus bus = new MessageBus(CONCURRENCY_LEVEL);
bus.addErrorHandler(TestFailingHandler); bus.addErrorHandler(TestFailingHandler);
@ -44,7 +44,8 @@ public class PerformanceTest {
while (true) { while (true) {
bus.publish(num); bus.publish(num);
} }
}}, CONCURRENCY_LEVEL); }
}, CONCURRENCY_LEVEL);
bus.shutdown(); bus.shutdown();

View File

@ -28,7 +28,7 @@ public class SyncBusTest extends MessageBusTest {
@Test @Test
public void testSynchronousMessagePublication() throws Exception { public void testSynchronousMessagePublication() throws Exception {
final IMessageBus bus = new MultiMBassador(); final IMessageBus bus = new MessageBus();
bus.start(); bus.start();
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DefaultListener.class)
@ -82,7 +82,7 @@ public class SyncBusTest extends MessageBusTest {
} }
}; };
final IMessageBus bus = new MultiMBassador(); final IMessageBus bus = new MessageBus();
bus.addErrorHandler(ExceptionCounter); bus.addErrorHandler(ExceptionCounter);
bus.start(); bus.start();
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.MultiMBassador; import dorkbox.util.messagebus.MessageBus;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.messages.MessageTypes; import dorkbox.util.messagebus.messages.MessageTypes;
@ -32,22 +32,22 @@ public abstract class MessageBusTest extends AssertSupport {
}; };
@Before @Before
public void setUp(){ public void setUp() {
for(MessageTypes mes : MessageTypes.values()) { for (MessageTypes mes : MessageTypes.values()) {
mes.reset(); mes.reset();
} }
} }
public MultiMBassador createBus() { public MessageBus createBus() {
MultiMBassador bus = new MultiMBassador(); MessageBus bus = new MessageBus();
bus.addErrorHandler(TestFailingHandler); bus.addErrorHandler(TestFailingHandler);
bus.start(); bus.start();
return bus; return bus;
} }
public MultiMBassador createBus(ListenerFactory listeners) { public MessageBus createBus(ListenerFactory listeners) {
MultiMBassador bus = new MultiMBassador(); MessageBus bus = new MessageBus();
bus.addErrorHandler(TestFailingHandler); bus.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
bus.start(); bus.start();

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.MultiMBassador; import dorkbox.util.messagebus.MessageBus;
import dorkbox.util.messagebus.PubSubSupport; import dorkbox.util.messagebus.PubSubSupport;
import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.subscription.SubscriptionManager;
@ -95,7 +95,7 @@ public class TestUtil {
} }
public static void setup(MultiMBassador bus, ListenerFactory listeners, int numberOfThreads) { public static void setup(MessageBus bus, ListenerFactory listeners, int numberOfThreads) {
setup(bus, listeners.getAll(), numberOfThreads); setup(bus, listeners.getAll(), numberOfThreads);
} }