Using the disruptor to deliver messages instead of a BlockingQueue+threads

This commit is contained in:
nathan 2015-02-05 03:52:31 +01:00
parent c0d19fcde4
commit fb985b6ef1
5 changed files with 155 additions and 61 deletions

View File

@ -1,82 +1,92 @@
package net.engio.mbassy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.disruptor.EventBusFactory;
import net.engio.mbassy.disruptor.EventProcessor;
import net.engio.mbassy.disruptor.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
*/
public class MBassador extends AbstractPubSubSupport implements IMessageBus {
private final int numberOfMessageDispatchers;
// all threads that are available for asynchronous message dispatching
private final List<Thread> dispatchers;
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<Object> pendingMessages = new LinkedBlockingQueue<Object>(Integer.MAX_VALUE / 16);
protected static final ThreadFactory MessageDispatchThreadFactory = new ThreadFactory() {
/**
* The stack size is arbitrary based on JVM implementation. Default is 0
* 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
*<p>
* To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
* Setting the size MAY or MAY NOT have any effect!!!
* <p>
* Stack size must be specified in bytes. Default is 8k
*/
public static int stackSizeForThreads = 8192;
private static final ThreadFactory namedThreadFactory = new ThreadFactory() {
private final AtomicInteger threadID = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);// do not prevent the JVM from exiting
thread.setName("Dispatcher-" + this.threadID.getAndIncrement());
return thread;
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("MessageBus-");
stringBuilder.append(this.threadID.getAndIncrement());
// stack size is arbitrary based on JVM implementation. Default is 0
// 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
// To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
// Setting the size MAY or MAY NOT have any effect!!!
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, stringBuilder.toString(), stackSizeForThreads);
t.setDaemon(true);// do not prevent the JVM from exiting
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
// any new thread will be 'NON-DAEMON', so that it will be allowed to finish it's task before permitting the JVM to shut down
private final ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
private final int ringBufferSize = 2048;
private final Disruptor<MessageHolder> disruptor;
private final RingBuffer<MessageHolder> ringBuffer;
public MBassador() {
this(6);
this(Runtime.getRuntime().availableProcessors() - 1);
}
public MBassador(int numberOfMessageDispatchers) {
// must be power of 2.
public MBassador(int numberOfThreads) {
super();
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
this.dispatchers = new ArrayList<Thread>(numberOfMessageDispatchers);
initDispatcherThreads();
}
// initialize the dispatch workers
private void initDispatcherThreads() {
for (int i = 0; i < this.numberOfMessageDispatchers; i++) {
// each thread will run forever and process incoming
// message publication requests
Thread dispatcher = MessageDispatchThreadFactory.newThread(new Runnable() {
@Override
public void run() {
Object message = null;
while (true) {
try {
message = MBassador.this.pendingMessages.take();
publishMessage(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", message));
}
}
}
});
dispatcher.setName("Message dispatcher");
this.dispatchers.add(dispatcher);
dispatcher.start();
if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 thread.
}
EventBusFactory factory = new EventBusFactory();
EventProcessor procs[] = new EventProcessor[numberOfThreads];
for (int i = 0; i < procs.length; i++) {
procs[i] = new EventProcessor(this, i, procs.length);
}
this.disruptor = new Disruptor<MessageHolder>(factory, this.ringBufferSize, this.executor, ProducerType.MULTI, new SleepingWaitStrategy());
// tell the disruptor to handle procs first, then results. IN ORDER.
this.disruptor.handleEventsWith(procs);
this.ringBuffer = this.disruptor.start();
}
@ -94,19 +104,48 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
@Override
public void publishAsync(Object message) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// setup the job
final long seq = ringBuffer.next();
try {
this.pendingMessages.put(message);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.message = message;
} catch (Exception e) {
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message", message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message) {
// put this on the disruptor ring buffer
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
// Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// to become available.
while (!ringBuffer.hasAvailableCapacity(1)) {
LockSupport.parkNanos(10L);
if (expireTimestamp <= System.currentTimeMillis()) {
handlePublicationError(new PublicationError(new Exception("Timeout"), "Error while adding an asynchronous message", message));
return;
}
}
// setup the job
final long seq = ringBuffer.next();
try {
this.pendingMessages.offer(message, timeout, unit);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", message));
MessageHolder eventJob = ringBuffer.get(seq);
eventJob.message = message;
} catch (Exception e) {
handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message", message));
} finally {
// always publish the job
ringBuffer.publish(seq);
}
}
@ -118,13 +157,12 @@ public class MBassador extends AbstractPubSubSupport implements IMessageBus {
@Override
public void shutdown() {
for (Thread dispatcher : this.dispatchers) {
dispatcher.interrupt();
}
this.disruptor.shutdown();
this.executor.shutdown();
}
@Override
public boolean hasPendingMessages() {
return this.pendingMessages.size() > 0;
return this.ringBuffer.remainingCapacity() != this.ringBufferSize;
}
}

View File

@ -52,7 +52,7 @@ public abstract class AbstractPubSubSupport implements PubSubSupport, ErrorHandl
}
}
protected void publishMessage(Object message) {
public void publishMessage(Object message) {
Class<? extends Object> messageClass = message.getClass();
// TODO: convert this to have N number of message types

View File

@ -0,0 +1,16 @@
package net.engio.mbassy.disruptor;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import com.lmax.disruptor.EventFactory;
public class EventBusFactory implements EventFactory<MessageHolder> {
public EventBusFactory() {
}
@Override
public MessageHolder newInstance() {
return new MessageHolder();
}
}

View File

@ -0,0 +1,31 @@
package net.engio.mbassy.disruptor;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import net.engio.mbassy.bus.error.PublicationError;
import com.lmax.disruptor.EventHandler;
public class EventProcessor implements EventHandler<MessageHolder> {
private final AbstractPubSubSupport publisher;
private final long ordinal;
private final long numberOfConsumers;
public EventProcessor(AbstractPubSubSupport publisher, final long ordinal, final long numberOfConsumers) {
this.publisher = publisher;
this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers;
}
@Override
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % this.numberOfConsumers == this.ordinal) {
try {
this.publisher.publishMessage(event.message);
} catch (Throwable t) {
this.publisher.handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", event.message));
}
event.message = null; // cleanup
}
}
}

View File

@ -0,0 +1,9 @@
package net.engio.mbassy.disruptor;
public class MessageHolder {
public Object message;
public MessageHolder() {
}
}