The new disruptor queue is a LOT faster, and thus the concurrent tests were failing. Fixed the checks in the test to a better way of checking.

This commit is contained in:
nathan 2019-06-07 12:35:04 +02:00
parent 621140695f
commit 89eee9ec9f

View File

@ -136,38 +136,54 @@ public class MBassadorTest extends MessageBusTest {
final MessageBus bus = new MessageBus();
bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);
ListenerFactory listeners = new ListenerFactory().create(InstancesPerListener, ExceptionThrowingListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
bus.publishAsync(new StandardMessage());
}
};
// single threaded
// multi threaded, 1 other thread
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
while (bus.hasPendingMessages()) {
// we want to wait a reasonable amount of time to check.
int count = 1000;
while (count != 0 && InstancesPerListener != exceptionCount.get()) {
count--;
pause(10);
}
// NOTE: this looks like a good idea, but it's not.
// while (bus.hasPendingMessages()) {
// pause(10);
// }
assertEquals(InstancesPerListener, exceptionCount.get());
// multi threaded
// multi threaded, `ConcurrentUnits` other threads
int expected = InstancesPerListener * ConcurrentUnits;
exceptionCount.set(0);
// we want to wait a reasonable amount of time to check.
count = 1000;
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
while (bus.hasPendingMessages()) {
while (count != 0 && expected != exceptionCount.get()) {
count--;
pause(10);
}
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
// NOTE: this looks like a good idea, but it's not.
// while (bus.hasPendingMessages()) {
// pause(10);
// }
assertEquals(expected, exceptionCount.get());
bus.shutdown();
}
}