Cleaned up sync/async publication

This commit is contained in:
nathan 2016-02-06 23:40:58 +01:00
parent 969e21d762
commit 4d5b53fa36
6 changed files with 230 additions and 246 deletions

View File

@ -135,15 +135,15 @@ class MessageBus implements IMessageBus {
// the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue
if (useDisruptorForAsyncPublish) {
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, publisher, syncPublication);
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication);
} else {
if (useNoGarbageVersionOfABQ) {
// no garbage is created, but this is slow (but faster than other messagebus implementations)
asyncPublication = new AsyncABQ_noGc(numberOfThreads, errorHandler, publisher, syncPublication);
asyncPublication = new AsyncABQ_noGc(numberOfThreads, errorHandler, syncPublication);
}
else {
// garbage is created, but this is fast
asyncPublication = new AsyncABQ(numberOfThreads, errorHandler, publisher, syncPublication);
asyncPublication = new AsyncABQ(numberOfThreads, errorHandler, syncPublication);
}
}
}

View File

@ -67,12 +67,6 @@ class Subscription {
this.handler = handler;
entries = new IdentityMap<Object, Entry>(32, SubscriptionManager.LOAD_FACTOR);
if (handler.acceptsSubtypes()) {
// TODO keep a list of "super-class" messages that access this. This is updated by multiple threads. This is so we know WHAT
// super-subscriptions to clear when we sub/unsub
}
}
// called on shutdown for GC purposes
@ -143,21 +137,12 @@ class Subscription {
return this.entries.size;
}
/**
* @return true if messages were published
*/
public abstract
void publish(final Object message) throws Throwable;
/**
* @return true if messages were published
*/
public abstract
void publish(final Object message1, final Object message2) throws Throwable;
/**
* @return true if messages were published
*/
public abstract
void publish(final Object message1, final Object message2, final Object message3) throws Throwable;

View File

@ -18,7 +18,6 @@ package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.disruptor.MessageType;
@ -47,110 +46,100 @@ class AsyncABQ implements Synchrony {
public
AsyncABQ(final int numberOfThreads,
final ErrorHandlingSupport errorHandler,
final Publisher publisher,
final Synchrony syncPublication) {
AsyncABQ(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) {
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings("ConstantConditions")
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ.this.dispatchQueue;
final Synchrony syncPublication1 = syncPublication;
final ErrorHandlingSupport errorHandler1 = errorHandler;
while (!AsyncABQ.this.shuttingDown) {
process(IN_QUEUE, syncPublication1, errorHandler1);
}
}
};
this.threads = new ArrayDeque<Thread>(numberOfThreads);
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ.this.dispatchQueue;
final Publisher publisher1 = publisher;
final Synchrony syncPublication1 = syncPublication;
final ErrorHandlingSupport errorHandler1 = errorHandler;
MessageHolder event = null;
int messageType = MessageType.ONE;
Object message1 = null;
Object message2 = null;
Object message3 = null;
while (!AsyncABQ.this.shuttingDown) {
try {
event = IN_QUEUE.take();
messageType = event.type;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
switch (messageType) {
case MessageType.ONE: {
publisher1.publish(syncPublication1, message1);
break;
}
case MessageType.TWO: {
publisher1.publish(syncPublication1, message1, message2);
break;
}
case MessageType.THREE: {
publisher1.publish(syncPublication1, message3, message1, message2);
break;
}
}
} catch (InterruptedException e) {
if (!AsyncABQ.this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
case MessageType.TWO: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1, message2);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
case MessageType.THREE: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1, message2, message3);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
}
}
}
}
}
};
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
}
}
@SuppressWarnings("Duplicates")
private
void process(final ArrayBlockingQueue<MessageHolder> queue, final Synchrony sync, final ErrorHandlingSupport errorHandler) {
MessageHolder event = null;
int messageType = MessageType.ONE;
Subscription[] subscriptions;
Object message1 = null;
Object message2 = null;
Object message3 = null;
try {
event = queue.take();
messageType = event.type;
subscriptions = event.subscriptions;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
switch (messageType) {
case MessageType.ONE: {
sync.publish(subscriptions, message1);
return;
}
case MessageType.TWO: {
sync.publish(subscriptions, message1, message2);
return;
}
case MessageType.THREE: {
sync.publish(subscriptions, message1, message2, message3);
//noinspection UnnecessaryReturnStatement
return;
}
}
} catch (Throwable e) {
if (event != null) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1));
return;
}
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement
return;
}
}
}
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
MessageHolder take = new MessageHolder();
take.type = MessageType.ONE;
take.subscriptions = subscriptions;
take.message1 = message1;
@ -162,6 +151,7 @@ class AsyncABQ implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
MessageHolder take = new MessageHolder();
take.type = MessageType.TWO;
take.subscriptions = subscriptions;
take.message1 = message1;
@ -174,6 +164,7 @@ class AsyncABQ implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
MessageHolder take = new MessageHolder();
take.type = MessageType.THREE;
take.subscriptions = subscriptions;
take.message1 = message1;

View File

@ -18,7 +18,6 @@ package dorkbox.util.messagebus.synchrony;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.disruptor.MessageType;
@ -42,7 +41,6 @@ class AsyncABQ_noGc implements Synchrony {
// have two queues to prevent garbage, So we pull off one queue to add to another queue and when done, we put it back
private final ArrayBlockingQueue<MessageHolder> gcQueue;
private final Collection<Thread> threads;
/**
@ -52,10 +50,7 @@ class AsyncABQ_noGc implements Synchrony {
public
AsyncABQ_noGc(final int numberOfThreads,
final ErrorHandlingSupport errorHandler,
final Publisher publisher,
final Synchrony syncPublication) {
AsyncABQ_noGc(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) {
this.dispatchQueue = new ArrayBlockingQueue<MessageHolder>(1024);
this.gcQueue = new ArrayBlockingQueue<MessageHolder>(1024);
@ -65,107 +60,103 @@ class AsyncABQ_noGc implements Synchrony {
gcQueue.add(new MessageHolder());
}
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings("ConstantConditions")
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ_noGc.this.dispatchQueue;
final ArrayBlockingQueue<MessageHolder> OUT_QUEUE = AsyncABQ_noGc.this.gcQueue;
final Synchrony syncPublication1 = syncPublication;
final ErrorHandlingSupport errorHandler1 = errorHandler;
while (!AsyncABQ_noGc.this.shuttingDown) {
process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1);
}
}
};
this.threads = new ArrayDeque<Thread>(numberOfThreads);
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@Override
public
void run() {
final ArrayBlockingQueue<MessageHolder> IN_QUEUE = AsyncABQ_noGc.this.dispatchQueue;
final ArrayBlockingQueue<MessageHolder> OUT_QUEUE = AsyncABQ_noGc.this.gcQueue;
final Publisher publisher1 = publisher;
final Synchrony syncPublication1 = syncPublication;
final ErrorHandlingSupport errorHandler1 = errorHandler;
MessageHolder event = null;
int messageType = MessageType.ONE;
Object message1 = null;
Object message2 = null;
Object message3 = null;
while (!AsyncABQ_noGc.this.shuttingDown) {
try {
event = IN_QUEUE.take();
messageType = event.type;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
OUT_QUEUE.put(event);
switch (messageType) {
case MessageType.ONE: {
publisher1.publish(syncPublication1, message1);
break;
}
case MessageType.TWO: {
publisher1.publish(syncPublication1, message1, message2);
break;
}
case MessageType.THREE: {
publisher1.publish(syncPublication1, message3, message1, message2);
break;
}
}
} catch (InterruptedException e) {
if (!AsyncABQ_noGc.this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
case MessageType.TWO: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1, message2);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
case MessageType.THREE: {
PublicationError publicationError = new PublicationError()
.setMessage("Thread interrupted while processing message")
.setCause(e);
if (event != null) {
publicationError.setPublishedObject(message1, message2, message3);
}
errorHandler1.handlePublicationError(publicationError);
break;
}
}
}
}
}
}
};
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
}
}
@SuppressWarnings("Duplicates")
private
void process(final ArrayBlockingQueue<MessageHolder> queue,
final ArrayBlockingQueue<MessageHolder> gcQueue,
final Synchrony sync,
final ErrorHandlingSupport errorHandler) {
MessageHolder event = null;
int messageType = MessageType.ONE;
Subscription[] subscriptions;
Object message1 = null;
Object message2 = null;
Object message3 = null;
try {
event = queue.take();
messageType = event.type;
subscriptions = event.subscriptions;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
gcQueue.put(event);
switch (messageType) {
case MessageType.ONE: {
sync.publish(subscriptions, message1);
return;
}
case MessageType.TWO: {
sync.publish(subscriptions, message1, message2);
return;
}
case MessageType.THREE: {
sync.publish(subscriptions, message1, message2, message3);
//noinspection UnnecessaryReturnStatement
return;
}
}
} catch (Throwable e) {
if (event != null) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1));
return;
}
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
//noinspection UnnecessaryReturnStatement
return;
}
}
}
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
MessageHolder take = gcQueue.take();
take.type = MessageType.ONE;
take.subscriptions = subscriptions;
take.message1 = message1;
@ -177,6 +168,7 @@ class AsyncABQ_noGc implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2) throws Throwable {
MessageHolder take = gcQueue.take();
take.type = MessageType.TWO;
take.subscriptions = subscriptions;
take.message1 = message1;
@ -189,6 +181,7 @@ class AsyncABQ_noGc implements Synchrony {
public
void publish(final Subscription[] subscriptions, final Object message1, final Object message2, final Object message3) throws Throwable {
MessageHolder take = gcQueue.take();
take.type = MessageType.THREE;
take.subscriptions = subscriptions;
take.message1 = message1;

View File

@ -15,22 +15,14 @@
*/
package dorkbox.util.messagebus.synchrony;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import com.lmax.disruptor.*;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.disruptor.EventBusFactory;
import dorkbox.util.messagebus.synchrony.disruptor.MessageHandler;
import dorkbox.util.messagebus.synchrony.disruptor.MessageType;
import dorkbox.util.messagebus.synchrony.disruptor.PublicationExceptionHandler;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -43,15 +35,13 @@ import java.util.concurrent.locks.LockSupport;
public final
class AsyncDisruptor implements Synchrony {
private final ErrorHandlingSupport errorHandler;
private WorkProcessor[] workProcessors;
private MessageHandler[] handlers;
private RingBuffer<MessageHolder> ringBuffer;
private Sequence workSequence;
public
AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Publisher publisher, final Synchrony syncPublication) {
this.errorHandler = errorHandler;
AsyncDisruptor(final int numberOfThreads, final ErrorHandlingSupport errorHandler, final Synchrony syncPublication) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
@ -65,7 +55,7 @@ class AsyncDisruptor implements Synchrony {
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(publisher, syncPublication); // exactly one per thread is used
handlers[i] = new MessageHandler(syncPublication, errorHandler); // exactly one per thread is used
}
@ -79,14 +69,14 @@ class AsyncDisruptor implements Synchrony {
WaitStrategy consumerWaitStrategy;
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good blocking one
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy(); // best for low latency
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(10, 50, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy()); // good combo
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);

View File

@ -17,51 +17,77 @@ package dorkbox.util.messagebus.synchrony.disruptor;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.MessageHolder;
import dorkbox.util.messagebus.synchrony.Synchrony;
import dorkbox.util.messagebus.publication.Publisher;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author dorkbox, llc
* Date: 2/2/15
* @author dorkbox, llc Date: 2/2/15
*/
public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final Publisher publisher;
private final Synchrony syncPublication;
private final ErrorHandlingSupport errorHandler;
AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public
MessageHandler(final Publisher publisher, final Synchrony syncPublication) {
this.publisher = publisher;
MessageHandler(final Synchrony syncPublication, final ErrorHandlingSupport errorHandler) {
this.syncPublication = syncPublication;
this.errorHandler = errorHandler;
}
@SuppressWarnings("Duplicates")
@Override
public
void onEvent(final MessageHolder event) throws Exception {
final int messageType = event.type;
switch (messageType) {
case MessageType.ONE: {
this.publisher.publish(syncPublication, event.message1);
return;
final Subscription[] subscriptions = event.subscriptions;
try {
switch (messageType) {
case MessageType.ONE: {
syncPublication.publish(subscriptions, event.message1);
return;
}
case MessageType.TWO: {
syncPublication.publish(subscriptions, event.message1, event.message2);
return;
}
case MessageType.THREE: {
syncPublication.publish(subscriptions, event.message1, event.message2, event.message3);
//noinspection UnnecessaryReturnStatement
return;
}
}
case MessageType.TWO: {
Object message1 = event.message1;
Object message2 = event.message2;
this.publisher.publish(syncPublication, message1, message2);
return;
}
case MessageType.THREE: {
Object message1 = event.message1;
Object message2 = event.message2;
Object message3 = event.message3;
this.publisher.publish(syncPublication, message3, message1, message2);
return;
} catch (Throwable e) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(event.message1));
return;
}
case MessageType.TWO: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(event.message1, event.message2));
return;
}
case MessageType.THREE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(event.message1,
event.message2,
event.message3));
//noinspection UnnecessaryReturnStatement
return;
}
}
}
}
@ -69,7 +95,6 @@ class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
@Override
public
void onStart() {
}
@Override