WIP disruptor. WIP simple queue
This commit is contained in:
parent
e257aced31
commit
3c021f383c
104
src/main/java/com/lmax/disruptor/DisruptorQueue.java
Normal file
104
src/main/java/com/lmax/disruptor/DisruptorQueue.java
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedTransferQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import dorkbox.util.messagebus.MultiMBassador;
|
||||||
|
import dorkbox.util.messagebus.common.NamedThreadFactory;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public class DisruptorQueue {
|
||||||
|
|
||||||
|
|
||||||
|
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
|
private final ExecutorService executor;
|
||||||
|
|
||||||
|
// must be power of 2.
|
||||||
|
private final int ringBufferSize = 1;
|
||||||
|
|
||||||
|
private final RingBuffer<MessageHolder> ringBuffer;
|
||||||
|
private WaitingWorkerPool<MessageHolder> workerPool;
|
||||||
|
|
||||||
|
public DisruptorQueue(MultiMBassador mbassador, int numberOfThreads) {
|
||||||
|
// numberOfThreads = 4;
|
||||||
|
|
||||||
|
// only used to startup threads, can be replaced with static list of threads
|
||||||
|
this.executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0,
|
||||||
|
TimeUnit.NANOSECONDS,
|
||||||
|
new LinkedTransferQueue<Runnable>(),
|
||||||
|
new NamedThreadFactory("disruptor"));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
EventBusFactory factory = new EventBusFactory();
|
||||||
|
PublicationExceptionHandler loggingExceptionHandler = new PublicationExceptionHandler(mbassador);
|
||||||
|
|
||||||
|
WorkHandlerEarlyRelease<MessageHolder> handlers[] = new EventProcessor2[numberOfThreads];
|
||||||
|
for (int i = 0; i < handlers.length; i++) {
|
||||||
|
handlers[i] = new EventProcessor2(mbassador);
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitStrategy consumerWaitStrategy = new BlockingWaitStrategy();
|
||||||
|
WaitingMultiProducerSequencer sequencer = new WaitingMultiProducerSequencer(4, loggingExceptionHandler, consumerWaitStrategy);
|
||||||
|
this.ringBuffer = new RingBuffer<MessageHolder>(factory, sequencer);
|
||||||
|
|
||||||
|
|
||||||
|
SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier();
|
||||||
|
this.workerPool = new WaitingWorkerPool<MessageHolder>(this.ringBuffer,
|
||||||
|
sequencer,
|
||||||
|
sequenceBarrier,
|
||||||
|
loggingExceptionHandler,
|
||||||
|
handlers);
|
||||||
|
|
||||||
|
sequencer.addGatingSequences(this.workerPool.getWorkerSequences()); // to notify our consumers (if they are blocking) of a new element
|
||||||
|
this.workerPool.start(this.executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void transfer(Object message1) throws InterruptedException {
|
||||||
|
// put this on the disruptor ring buffer
|
||||||
|
final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
|
||||||
|
|
||||||
|
// setup the job
|
||||||
|
final long seq = ringBuffer.next();
|
||||||
|
try {
|
||||||
|
// System.err.println("+(" + seq + ") " + message1);
|
||||||
|
MessageHolder eventJob = ringBuffer.get(seq);
|
||||||
|
eventJob.messageType = MessageType.ONE;
|
||||||
|
eventJob.message1 = message1;
|
||||||
|
// eventJob.message2 = message2;
|
||||||
|
// eventJob.message3 = message3;
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
// handlePublicationError(new PublicationError()
|
||||||
|
// .setMessage("Error while adding an asynchronous message")
|
||||||
|
// .setCause(e)
|
||||||
|
// .setPublishedObject(message1));
|
||||||
|
} finally {
|
||||||
|
// always publish the job
|
||||||
|
ringBuffer.publish(seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasPendingMessages() {
|
||||||
|
final long cursor = this.ringBuffer.getCursor();
|
||||||
|
Sequence[] workerSequences = this.workerPool.getWorkerSequences();
|
||||||
|
for (Sequence s : workerSequences) {
|
||||||
|
if (cursor > s.get())
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
18
src/main/java/com/lmax/disruptor/EventBusFactory.java
Normal file
18
src/main/java/com/lmax/disruptor/EventBusFactory.java
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author dorkbox, llc
|
||||||
|
* Date: 2/2/15
|
||||||
|
*/
|
||||||
|
public class EventBusFactory implements EventFactory<MessageHolder> {
|
||||||
|
|
||||||
|
public EventBusFactory() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageHolder newInstance() {
|
||||||
|
return new MessageHolder();
|
||||||
|
}
|
||||||
|
}
|
53
src/main/java/com/lmax/disruptor/EventProcessor2.java
Normal file
53
src/main/java/com/lmax/disruptor/EventProcessor2.java
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import dorkbox.util.messagebus.PubSubSupport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author dorkbox, llc
|
||||||
|
* Date: 2/2/15
|
||||||
|
*/
|
||||||
|
public class EventProcessor2 implements WorkHandlerEarlyRelease<MessageHolder> {
|
||||||
|
private final PubSubSupport publisher;
|
||||||
|
private WaitingWorkProcessor workProcessor;
|
||||||
|
|
||||||
|
public EventProcessor2(PubSubSupport publisher) {
|
||||||
|
this.publisher = publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProcessor(WaitingWorkProcessor workProcessor) {
|
||||||
|
// this.workProcessor = workProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onEvent(long sequence, MessageHolder event) throws Exception {
|
||||||
|
MessageType messageType = event.messageType;
|
||||||
|
switch (messageType) {
|
||||||
|
case ONE: {
|
||||||
|
Object message1 = event.message1;
|
||||||
|
// System.err.println("(" + sequence + ")" + message1);
|
||||||
|
// this.workProcessor.release(sequence);
|
||||||
|
this.publisher.publish(message1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case TWO: {
|
||||||
|
Object message1 = event.message1;
|
||||||
|
Object message2 = event.message2;
|
||||||
|
this.publisher.publish(message1, message2);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case THREE: {
|
||||||
|
Object message1 = event.message1;
|
||||||
|
Object message2 = event.message2;
|
||||||
|
Object message3 = event.message3;
|
||||||
|
this.publisher.publish(message1, message2, message3);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case ARRAY: {
|
||||||
|
Object[] messages = event.messages;
|
||||||
|
this.publisher.publish(messages);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,9 +1,8 @@
|
|||||||
package dorkbox.util.messagebus.subscription;
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author dorkbox, llc
|
* @author dorkbox, llc Date: 2/2/15
|
||||||
* Date: 2/2/15
|
|
||||||
*/
|
*/
|
||||||
public class MessageHolder {
|
public class MessageHolder {
|
||||||
public MessageType messageType = MessageType.ONE;
|
public MessageType messageType = MessageType.ONE;
|
||||||
@ -13,6 +12,5 @@ public class MessageHolder {
|
|||||||
public Object message3 = null;
|
public Object message3 = null;
|
||||||
public Object[] messages = null;
|
public Object[] messages = null;
|
||||||
|
|
||||||
public MessageHolder() {
|
public MessageHolder() {}
|
||||||
}
|
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package dorkbox.util.messagebus.subscription;
|
package com.lmax.disruptor;
|
||||||
/**
|
/**
|
||||||
* @author dorkbox, llc
|
* @author dorkbox, llc
|
||||||
* Date: 2/2/15
|
* Date: 2/2/15
|
@ -0,0 +1,35 @@
|
|||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.ExceptionHandler;
|
||||||
|
|
||||||
|
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
|
||||||
|
import dorkbox.util.messagebus.error.PublicationError;
|
||||||
|
|
||||||
|
public final class PublicationExceptionHandler implements ExceptionHandler {
|
||||||
|
private final ErrorHandlingSupport errorHandler;
|
||||||
|
|
||||||
|
public PublicationExceptionHandler(ErrorHandlingSupport errorHandler) {
|
||||||
|
this.errorHandler = errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleEventException(final Throwable e, final long sequence, final Object event) {
|
||||||
|
this.errorHandler.handlePublicationError(new PublicationError()
|
||||||
|
.setMessage("Exception processing: " + sequence + " " + event.getClass() + "(" + event + ")")
|
||||||
|
.setCause(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleOnStartException(final Throwable e) {
|
||||||
|
this.errorHandler.handlePublicationError(new PublicationError()
|
||||||
|
.setMessage("Error starting the disruptor")
|
||||||
|
.setCause(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleOnShutdownException(final Throwable e) {
|
||||||
|
this.errorHandler.handlePublicationError(new PublicationError()
|
||||||
|
.setMessage("Error stopping the disruptor")
|
||||||
|
.setCause(e));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,371 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 LMAX Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import sun.misc.Unsafe;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.util.Util;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
|
||||||
|
* Suitable for use for sequencing across multiple publisher threads.</p>
|
||||||
|
*
|
||||||
|
* <p> * <p>Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
|
||||||
|
* to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then
|
||||||
|
* {@link Sequencer#getHighestPublishedSequence(long, long)} should be used.
|
||||||
|
*/
|
||||||
|
public final class WaitingMultiProducerSequencer extends AbstractSequencer implements WaitStrategy
|
||||||
|
{
|
||||||
|
private static final Unsafe UNSAFE = Util.getUnsafe();
|
||||||
|
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
|
||||||
|
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
|
||||||
|
|
||||||
|
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
|
||||||
|
|
||||||
|
private final Lock lock = new ReentrantLock();
|
||||||
|
private final Condition processorNotifyCondition = this.lock.newCondition();
|
||||||
|
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
private final ExceptionHandler exceptionHandler;
|
||||||
|
|
||||||
|
// availableBuffer tracks the state of each ringbuffer slot
|
||||||
|
// see below for more details on the approach
|
||||||
|
private final int[] availableBuffer;
|
||||||
|
private final int indexMask;
|
||||||
|
private final int indexShift;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a Sequencer with the selected wait strategy and buffer size.
|
||||||
|
*
|
||||||
|
* @param bufferSize the size of the buffer that this will sequence over.
|
||||||
|
* @param producerWaitStrategy for those producing sequences
|
||||||
|
* @param consumerWaitStrategy for those waiting on sequences.
|
||||||
|
*/
|
||||||
|
public WaitingMultiProducerSequencer(int bufferSize, ExceptionHandler exceptionHandler, WaitStrategy consumerWaitStrategy)
|
||||||
|
{
|
||||||
|
super(bufferSize, consumerWaitStrategy);
|
||||||
|
this.exceptionHandler = exceptionHandler;
|
||||||
|
|
||||||
|
this.availableBuffer = new int[bufferSize];
|
||||||
|
this.indexMask = bufferSize - 1;
|
||||||
|
this.indexShift = Util.log2(bufferSize);
|
||||||
|
initialiseAvailableBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void halt() {
|
||||||
|
this.running.set(false);
|
||||||
|
signalAllWhenBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#hasAvailableCapacity(int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean hasAvailableCapacity(final int requiredCapacity)
|
||||||
|
{
|
||||||
|
return hasAvailableCapacity(this.gatingSequences, requiredCapacity, this.cursor.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
|
||||||
|
{
|
||||||
|
long wrapPoint = cursorValue + requiredCapacity - this.bufferSize;
|
||||||
|
long cachedGatingSequence = this.gatingSequenceCache.get();
|
||||||
|
|
||||||
|
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
|
||||||
|
{
|
||||||
|
long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
|
||||||
|
this.gatingSequenceCache.set(minSequence);
|
||||||
|
|
||||||
|
if (wrapPoint > minSequence)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#claim(long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void claim(long sequence)
|
||||||
|
{
|
||||||
|
this.cursor.set(sequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#next()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long next()
|
||||||
|
{
|
||||||
|
return next(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#next(int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long next(int n)
|
||||||
|
{
|
||||||
|
if (n < 1)
|
||||||
|
{
|
||||||
|
throw new IllegalArgumentException("n must be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
long current;
|
||||||
|
long next;
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
current = this.cursor.get();
|
||||||
|
next = current + n;
|
||||||
|
|
||||||
|
long wrapPoint = next - this.bufferSize;
|
||||||
|
long cachedGatingSequence = this.gatingSequenceCache.get();
|
||||||
|
|
||||||
|
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
|
||||||
|
{
|
||||||
|
long gatingSequence = Util.getMinimumSequence(this.gatingSequences, current);
|
||||||
|
|
||||||
|
if (wrapPoint > gatingSequence)
|
||||||
|
{
|
||||||
|
// try {
|
||||||
|
// waitFor(gatingSequence+1, this.cursor, null, null); // as soon as a spot frees by a consumer, continue
|
||||||
|
// }
|
||||||
|
// catch (final AlertException ex)
|
||||||
|
// {
|
||||||
|
// if (!this.running.get())
|
||||||
|
// {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// } catch (final Throwable ex)
|
||||||
|
// {
|
||||||
|
// this.exceptionHandler.handleEventException(ex, next, null);
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
|
||||||
|
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.gatingSequenceCache.set(gatingSequence);
|
||||||
|
}
|
||||||
|
else if (this.cursor.compareAndSet(current, next))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (true);
|
||||||
|
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#tryNext()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long tryNext() throws InsufficientCapacityException
|
||||||
|
{
|
||||||
|
return tryNext(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#tryNext(int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long tryNext(int n) throws InsufficientCapacityException
|
||||||
|
{
|
||||||
|
if (n < 1)
|
||||||
|
{
|
||||||
|
throw new IllegalArgumentException("n must be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
long current;
|
||||||
|
long next;
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
current = this.cursor.get();
|
||||||
|
next = current + n;
|
||||||
|
|
||||||
|
if (!hasAvailableCapacity(this.gatingSequences, n, current))
|
||||||
|
{
|
||||||
|
throw InsufficientCapacityException.INSTANCE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (!this.cursor.compareAndSet(current, next));
|
||||||
|
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#remainingCapacity()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long remainingCapacity()
|
||||||
|
{
|
||||||
|
long consumed = Util.getMinimumSequence(this.gatingSequences, this.cursor.get());
|
||||||
|
long produced = this.cursor.get();
|
||||||
|
return getBufferSize() - (produced - consumed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initialiseAvailableBuffer()
|
||||||
|
{
|
||||||
|
for (int i = this.availableBuffer.length - 1; i != 0; i--)
|
||||||
|
{
|
||||||
|
setAvailableBufferValue(i, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
setAvailableBufferValue(0, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#publish(long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void publish(final long sequence)
|
||||||
|
{
|
||||||
|
setAvailable(sequence);
|
||||||
|
this.waitStrategy.signalAllWhenBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#publish(long, long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void publish(long lo, long hi)
|
||||||
|
{
|
||||||
|
for (long l = lo; l <= hi; l++)
|
||||||
|
{
|
||||||
|
setAvailable(l);
|
||||||
|
}
|
||||||
|
this.waitStrategy.signalAllWhenBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The below methods work on the availableBuffer flag.
|
||||||
|
*
|
||||||
|
* The prime reason is to avoid a shared sequence object between publisher threads.
|
||||||
|
* (Keeping single pointers tracking start and end would require coordination
|
||||||
|
* between the threads).
|
||||||
|
*
|
||||||
|
* -- Firstly we have the constraint that the delta between the cursor and minimum
|
||||||
|
* gating sequence will never be larger than the buffer size (the code in
|
||||||
|
* next/tryNext in the Sequence takes care of that).
|
||||||
|
* -- Given that; take the sequence value and mask off the lower portion of the
|
||||||
|
* sequence as the index into the buffer (indexMask). (aka modulo operator)
|
||||||
|
* -- The upper portion of the sequence becomes the value to check for availability.
|
||||||
|
* ie: it tells us how many times around the ring buffer we've been (aka division)
|
||||||
|
* -- Because we can't wrap without the gating sequences moving forward (i.e. the
|
||||||
|
* minimum gating sequence is effectively our last available position in the
|
||||||
|
* buffer), when we have new data and successfully claimed a slot we can simply
|
||||||
|
* write over the top.
|
||||||
|
*/
|
||||||
|
private void setAvailable(final long sequence)
|
||||||
|
{
|
||||||
|
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setAvailableBufferValue(int index, int flag)
|
||||||
|
{
|
||||||
|
long bufferAddress = index * SCALE + BASE;
|
||||||
|
UNSAFE.putOrderedInt(this.availableBuffer, bufferAddress, flag);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see Sequencer#isAvailable(long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isAvailable(long sequence)
|
||||||
|
{
|
||||||
|
int index = calculateIndex(sequence);
|
||||||
|
int flag = calculateAvailabilityFlag(sequence);
|
||||||
|
long bufferAddress = index * SCALE + BASE;
|
||||||
|
return UNSAFE.getIntVolatile(this.availableBuffer, bufferAddress) == flag;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
|
||||||
|
{
|
||||||
|
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
|
||||||
|
{
|
||||||
|
if (!isAvailable(sequence))
|
||||||
|
{
|
||||||
|
return sequence - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return availableSequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int calculateAvailabilityFlag(final long sequence)
|
||||||
|
{
|
||||||
|
return (int) (sequence >>> this.indexShift);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int calculateIndex(final long sequence)
|
||||||
|
{
|
||||||
|
return (int) sequence & this.indexMask;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
|
||||||
|
throws AlertException, InterruptedException
|
||||||
|
{
|
||||||
|
long availableSequence;
|
||||||
|
|
||||||
|
if ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence)
|
||||||
|
{
|
||||||
|
this.lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while ((availableSequence = Util.getMinimumSequence(this.gatingSequences)) < sequence && this.running.get())
|
||||||
|
{
|
||||||
|
this.processorNotifyCondition.await();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
this.lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return availableSequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void signalAllWhenBlocking()
|
||||||
|
{
|
||||||
|
this.lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
this.processorNotifyCondition.signalAll();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
this.lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
208
src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java
Normal file
208
src/main/java/com/lmax/disruptor/WaitingWorkProcessor.java
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 LMAX Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A {@link WaitingWorkProcessor} wraps a single {@link WorkHandler}, effectively consuming the sequence
|
||||||
|
* and ensuring appropriate barriers.</p>
|
||||||
|
*
|
||||||
|
* <p>Generally, this will be used as part of a {@link WaitingWorkerPool}.</p>
|
||||||
|
*
|
||||||
|
* @param <T> event implementation storing the details for the work to processed.
|
||||||
|
*/
|
||||||
|
public final class WaitingWorkProcessor<T>
|
||||||
|
implements EventProcessor
|
||||||
|
{
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
|
||||||
|
private final RingBuffer<T> ringBuffer;
|
||||||
|
private final SequenceBarrier sequenceBarrier;
|
||||||
|
private final WorkHandlerEarlyRelease<? super T> workHandler;
|
||||||
|
private final ExceptionHandler exceptionHandler;
|
||||||
|
private final Sequence workSequence;
|
||||||
|
|
||||||
|
private final EventReleaser eventReleaser = new EventReleaser()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void release()
|
||||||
|
{
|
||||||
|
WaitingWorkProcessor.this.sequence.set(Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private WaitStrategy publisherStrategy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a {@link WaitingWorkProcessor}.
|
||||||
|
*
|
||||||
|
* @param ringBuffer to which events are published.
|
||||||
|
* @param sequenceBarrier on which it is waiting.
|
||||||
|
* @param sequenceBarrier
|
||||||
|
* @param workHandler is the delegate to which events are dispatched.
|
||||||
|
* @param exceptionHandler to be called back when an error occurs
|
||||||
|
* @param workSequence from which to claim the next event to be worked on. It should always be initialised
|
||||||
|
* as {@link Sequencer#INITIAL_CURSOR_VALUE}
|
||||||
|
*/
|
||||||
|
public WaitingWorkProcessor(final RingBuffer<T> ringBuffer,
|
||||||
|
final WaitStrategy publisherStrategy,
|
||||||
|
final SequenceBarrier sequenceBarrier,
|
||||||
|
final WorkHandlerEarlyRelease<? super T> workHandler,
|
||||||
|
final ExceptionHandler exceptionHandler,
|
||||||
|
final Sequence workSequence)
|
||||||
|
{
|
||||||
|
this.ringBuffer = ringBuffer;
|
||||||
|
this.publisherStrategy = publisherStrategy;
|
||||||
|
this.sequenceBarrier = sequenceBarrier;
|
||||||
|
this.workHandler = workHandler;
|
||||||
|
this.exceptionHandler = exceptionHandler;
|
||||||
|
this.workSequence = workSequence;
|
||||||
|
|
||||||
|
if (this.workHandler instanceof EventReleaseAware)
|
||||||
|
{
|
||||||
|
((EventReleaseAware)this.workHandler).setEventReleaser(this.eventReleaser);
|
||||||
|
}
|
||||||
|
|
||||||
|
workHandler.setProcessor(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence getSequence()
|
||||||
|
{
|
||||||
|
return this.sequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void halt()
|
||||||
|
{
|
||||||
|
this.running.set(false);
|
||||||
|
this.sequenceBarrier.alert();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning()
|
||||||
|
{
|
||||||
|
return this.running.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It is ok to have another thread re-run this method after a halt().
|
||||||
|
*
|
||||||
|
* @throws IllegalStateException if this processor is already running
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
if (!this.running.compareAndSet(false, true))
|
||||||
|
{
|
||||||
|
throw new IllegalStateException("Thread is already running");
|
||||||
|
}
|
||||||
|
this.sequenceBarrier.clearAlert();
|
||||||
|
|
||||||
|
notifyStart();
|
||||||
|
|
||||||
|
WaitStrategy publisherStrategy = this.publisherStrategy;
|
||||||
|
boolean processedSequence = true;
|
||||||
|
long cachedAvailableSequence = Long.MIN_VALUE;
|
||||||
|
long nextSequence = this.sequence.get();
|
||||||
|
T event = null;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// if previous sequence was processed - fetch the next sequence and set
|
||||||
|
// that we have successfully processed the previous sequence
|
||||||
|
// typically, this will be true
|
||||||
|
// this prevents the sequence getting too far forward if an exception
|
||||||
|
// is thrown from the WorkHandler
|
||||||
|
if (processedSequence)
|
||||||
|
{
|
||||||
|
processedSequence = false;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
nextSequence = this.workSequence.get() + 1L;
|
||||||
|
// this tells the producer that we are done with our sequence, and that it can reuse it's spot in the ring buffer
|
||||||
|
this.sequence.set(nextSequence - 1L);
|
||||||
|
}
|
||||||
|
while (!this.workSequence.compareAndSet(nextSequence - 1L, nextSequence));
|
||||||
|
// publisherStrategy.signalAllWhenBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cachedAvailableSequence >= nextSequence)
|
||||||
|
{
|
||||||
|
event = this.ringBuffer.get(nextSequence);
|
||||||
|
this.workHandler.onEvent(nextSequence, event);
|
||||||
|
processedSequence = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cachedAvailableSequence = this.sequenceBarrier.waitFor(nextSequence);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (final AlertException ex)
|
||||||
|
{
|
||||||
|
if (!this.running.get())
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (final Throwable ex)
|
||||||
|
{
|
||||||
|
// handle, mark as processed, unless the exception handler threw an exception
|
||||||
|
this.exceptionHandler.handleEventException(ex, nextSequence, event);
|
||||||
|
processedSequence = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
notifyShutdown();
|
||||||
|
|
||||||
|
this.running.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyStart()
|
||||||
|
{
|
||||||
|
if (this.workHandler instanceof LifecycleAware)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
((LifecycleAware)this.workHandler).onStart();
|
||||||
|
}
|
||||||
|
catch (final Throwable ex)
|
||||||
|
{
|
||||||
|
this.exceptionHandler.handleOnStartException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyShutdown()
|
||||||
|
{
|
||||||
|
if (this.workHandler instanceof LifecycleAware)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
((LifecycleAware)this.workHandler).onShutdown();
|
||||||
|
}
|
||||||
|
catch (final Throwable ex)
|
||||||
|
{
|
||||||
|
this.exceptionHandler.handleOnShutdownException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release(long sequence) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
150
src/main/java/com/lmax/disruptor/WaitingWorkerPool.java
Normal file
150
src/main/java/com/lmax/disruptor/WaitingWorkerPool.java
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 LMAX Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.util.Util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* WorkerPool contains a pool of {@link WaitingWorkProcessor}s that will consume sequences so jobs can be farmed out across a pool of workers.
|
||||||
|
* Each of the {@link WaitingWorkProcessor}s manage and calls a {@link WorkHandler} to process the events.
|
||||||
|
*
|
||||||
|
* @param <T> event to be processed by a pool of workers
|
||||||
|
*/
|
||||||
|
public final class WaitingWorkerPool<T>
|
||||||
|
{
|
||||||
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
|
||||||
|
private final RingBuffer<T> ringBuffer;
|
||||||
|
// WorkProcessors are created to wrap each of the provided WorkHandlers
|
||||||
|
private final WaitingWorkProcessor<?>[] workProcessors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a worker pool to enable an array of {@link WorkHandler}s to consume published sequences.
|
||||||
|
*
|
||||||
|
* This option requires a pre-configured {@link RingBuffer} which must have {@link RingBuffer#addGatingSequences(Sequence...)}
|
||||||
|
* called before the work pool is started.
|
||||||
|
*
|
||||||
|
* @param ringBuffer of events to be consumed.
|
||||||
|
* @param sequenceBarrier on which the workers will depend.
|
||||||
|
* @param sequenceBarrier2
|
||||||
|
* @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
|
||||||
|
* @param workHandlers to distribute the work load across.
|
||||||
|
*/
|
||||||
|
public WaitingWorkerPool(final RingBuffer<T> ringBuffer,
|
||||||
|
final WaitStrategy publisherStrategy,
|
||||||
|
final SequenceBarrier sequenceBarrier,
|
||||||
|
final ExceptionHandler exceptionHandler,
|
||||||
|
final WorkHandlerEarlyRelease<? super T>... workHandlers)
|
||||||
|
{
|
||||||
|
this.ringBuffer = ringBuffer;
|
||||||
|
final int numWorkers = workHandlers.length;
|
||||||
|
this.workProcessors = new WaitingWorkProcessor[numWorkers];
|
||||||
|
|
||||||
|
for (int i = 0; i < numWorkers; i++)
|
||||||
|
{
|
||||||
|
this.workProcessors[i] = new WaitingWorkProcessor<T>(ringBuffer,
|
||||||
|
publisherStrategy,
|
||||||
|
sequenceBarrier,
|
||||||
|
workHandlers[i],
|
||||||
|
exceptionHandler,
|
||||||
|
this.workSequence);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an array of {@link Sequence}s representing the progress of the workers.
|
||||||
|
*
|
||||||
|
* @return an array of {@link Sequence}s representing the progress of the workers.
|
||||||
|
*/
|
||||||
|
public Sequence[] getWorkerSequences()
|
||||||
|
{
|
||||||
|
// final Sequence[] sequences = new Sequence[this.workProcessors.length + 1];
|
||||||
|
final Sequence[] sequences = new Sequence[this.workProcessors.length];
|
||||||
|
for (int i = 0, size = this.workProcessors.length; i < size; i++)
|
||||||
|
{
|
||||||
|
sequences[i] = this.workProcessors[i].getSequence();
|
||||||
|
}
|
||||||
|
// sequences[sequences.length - 1] = this.workSequence;
|
||||||
|
|
||||||
|
return sequences;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the worker pool processing events in sequence.
|
||||||
|
*
|
||||||
|
* @param executor providing threads for running the workers.
|
||||||
|
* @return the {@link RingBuffer} used for the work queue.
|
||||||
|
* @throws IllegalStateException if the pool has already been started and not halted yet
|
||||||
|
*/
|
||||||
|
public RingBuffer<T> start(final Executor executor)
|
||||||
|
{
|
||||||
|
if (!this.started.compareAndSet(false, true))
|
||||||
|
{
|
||||||
|
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final long cursor = this.ringBuffer.getCursor();
|
||||||
|
this.workSequence.set(cursor);
|
||||||
|
|
||||||
|
for (WaitingWorkProcessor<?> processor : this.workProcessors)
|
||||||
|
{
|
||||||
|
processor.getSequence().set(cursor);
|
||||||
|
executor.execute(processor);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.ringBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the {@link RingBuffer} to drain of published events then halt the workers.
|
||||||
|
*/
|
||||||
|
public void drainAndHalt()
|
||||||
|
{
|
||||||
|
Sequence[] workerSequences = getWorkerSequences();
|
||||||
|
while (this.ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences))
|
||||||
|
{
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (WaitingWorkProcessor<?> processor : this.workProcessors)
|
||||||
|
{
|
||||||
|
processor.halt();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.started.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Halt all workers immediately at the end of their current cycle.
|
||||||
|
*/
|
||||||
|
public void halt()
|
||||||
|
{
|
||||||
|
for (WaitingWorkProcessor<?> processor : this.workProcessors)
|
||||||
|
{
|
||||||
|
processor.halt();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.started.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRunning()
|
||||||
|
{
|
||||||
|
return this.started.get();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 LMAX Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package com.lmax.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback interface to be implemented for processing units of work as they become available in the {@link RingBuffer}.
|
||||||
|
*
|
||||||
|
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
|
||||||
|
* @see WaitingWorkerPool
|
||||||
|
*/
|
||||||
|
public interface WorkHandlerEarlyRelease<T>
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Callback to indicate a unit of work needs to be processed.
|
||||||
|
*
|
||||||
|
* @param event published to the {@link RingBuffer}
|
||||||
|
* @throws Exception if the {@link WorkHandlerEarlyRelease} would like the exception handled further up the chain.
|
||||||
|
*/
|
||||||
|
void onEvent(long sequence, T event) throws Exception;
|
||||||
|
|
||||||
|
void setProcessor(WaitingWorkProcessor workProcessor);
|
||||||
|
}
|
@ -6,10 +6,11 @@ import java.util.Collection;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.MessageHolder;
|
||||||
|
|
||||||
import dorkbox.util.messagebus.common.DeadMessage;
|
import dorkbox.util.messagebus.common.DeadMessage;
|
||||||
import dorkbox.util.messagebus.common.LinkedTransferQueue;
|
|
||||||
import dorkbox.util.messagebus.common.NamedThreadFactory;
|
import dorkbox.util.messagebus.common.NamedThreadFactory;
|
||||||
import dorkbox.util.messagebus.common.TransferQueue;
|
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
|
||||||
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
|
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
|
||||||
import dorkbox.util.messagebus.error.PublicationError;
|
import dorkbox.util.messagebus.error.PublicationError;
|
||||||
import dorkbox.util.messagebus.subscription.Subscription;
|
import dorkbox.util.messagebus.subscription.Subscription;
|
||||||
@ -40,7 +41,9 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
// this handler will receive all errors that occur during message dispatch or message handling
|
// this handler will receive all errors that occur during message dispatch or message handling
|
||||||
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
|
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
|
||||||
|
|
||||||
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
|
// private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
|
||||||
|
// private final DisruptorQueue dispatchQueue;
|
||||||
|
private final SimpleQueue dispatchQueue;
|
||||||
|
|
||||||
private final SubscriptionManager subscriptionManager;
|
private final SubscriptionManager subscriptionManager;
|
||||||
|
|
||||||
@ -61,7 +64,7 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
*/
|
*/
|
||||||
public MultiMBassador() {
|
public MultiMBassador() {
|
||||||
this(Runtime.getRuntime().availableProcessors());
|
this(Runtime.getRuntime().availableProcessors());
|
||||||
// this(4);
|
// this(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -84,6 +87,9 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
}
|
}
|
||||||
this.numberOfThreads = numberOfThreads;
|
this.numberOfThreads = numberOfThreads;
|
||||||
|
|
||||||
|
// this.dispatchQueue = new DisruptorQueue(this, numberOfThreads);
|
||||||
|
this.dispatchQueue = new SimpleQueue(numberOfThreads);
|
||||||
|
|
||||||
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
|
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
|
||||||
this.threads = new ArrayDeque<Thread>(numberOfThreads);
|
this.threads = new ArrayDeque<Thread>(numberOfThreads);
|
||||||
|
|
||||||
@ -94,8 +100,10 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
@SuppressWarnings("null")
|
@SuppressWarnings("null")
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
|
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
|
||||||
Runnable event = null;
|
SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
|
||||||
|
// Runnable event = null;
|
||||||
|
MessageHolder event = null;
|
||||||
int spins;
|
int spins;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -113,7 +121,11 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
event.run();
|
Object message1 = event.message1;
|
||||||
|
IN_QUEUE.release(event);
|
||||||
|
// event.run();
|
||||||
|
publish(message1);
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -125,8 +137,6 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
this.threads.add(runner);
|
this.threads.add(runner);
|
||||||
runner.start();
|
runner.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -157,7 +167,8 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasPendingMessages() {
|
public boolean hasPendingMessages() {
|
||||||
return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
|
// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
|
||||||
|
return this.dispatchQueue.hasPendingMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -396,15 +407,16 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
@Override
|
@Override
|
||||||
public void publishAsync(final Object message) {
|
public void publishAsync(final Object message) {
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
Runnable runnable = new Runnable() {
|
// Runnable runnable = new Runnable() {
|
||||||
@Override
|
// @Override
|
||||||
public void run() {
|
// public void run() {
|
||||||
MultiMBassador.this.publish(message);
|
// MultiMBassador.this.publish(message);
|
||||||
}
|
// }
|
||||||
};
|
// };
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.dispatchQueue.transfer(runnable);
|
// this.dispatchQueue.transfer(runnable);
|
||||||
|
this.dispatchQueue.transfer(message);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
handlePublicationError(new PublicationError()
|
handlePublicationError(new PublicationError()
|
||||||
.setMessage("Error while adding an asynchronous message")
|
.setMessage("Error while adding an asynchronous message")
|
||||||
|
@ -0,0 +1,109 @@
|
|||||||
|
package dorkbox.util.messagebus.common.simpleq;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.MessageHolder;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public class SimpleQueue {
|
||||||
|
|
||||||
|
|
||||||
|
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
|
private final Lock publisherLock = new ReentrantLock();
|
||||||
|
private final Condition publisherNotifyCondition = this.publisherLock.newCondition();
|
||||||
|
|
||||||
|
private final Lock consumerLock = new ReentrantLock();
|
||||||
|
private final Condition consumerNotifyCondition = this.consumerLock.newCondition();
|
||||||
|
|
||||||
|
private final AtomicReference<MessageHolder> consumer = new AtomicReference<MessageHolder>();
|
||||||
|
private final AtomicReference<MessageHolder> producer = new AtomicReference<MessageHolder>();
|
||||||
|
|
||||||
|
private volatile boolean waitingProducer = false;
|
||||||
|
private final AtomicInteger availableThreads = new AtomicInteger();
|
||||||
|
|
||||||
|
|
||||||
|
public SimpleQueue(int numberOfThreads) {
|
||||||
|
this.availableThreads.set(numberOfThreads);
|
||||||
|
this.producer.set(new MessageHolder());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void transfer(Object message1) throws InterruptedException {
|
||||||
|
MessageHolder holder = null;
|
||||||
|
|
||||||
|
if ((holder = this.producer.getAndSet(null)) == null) {
|
||||||
|
this.publisherLock.lock();
|
||||||
|
try {
|
||||||
|
while ((holder = this.producer.getAndSet(null)) == null) {
|
||||||
|
this.waitingProducer = true;
|
||||||
|
this.publisherNotifyCondition.await();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.publisherLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
holder.message1 = message1;
|
||||||
|
this.consumer.set(holder);
|
||||||
|
|
||||||
|
this.consumerLock.lock();
|
||||||
|
try {
|
||||||
|
this.consumerNotifyCondition.signalAll();
|
||||||
|
} finally {
|
||||||
|
this.consumerLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasPendingMessages() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageHolder poll() {
|
||||||
|
return this.consumer.getAndSet(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageHolder take() throws InterruptedException {
|
||||||
|
MessageHolder holder = null;
|
||||||
|
|
||||||
|
this.consumerLock.lock();
|
||||||
|
try {
|
||||||
|
while ((holder = this.consumer.getAndSet(null)) == null) {
|
||||||
|
this.consumerNotifyCondition.await();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.consumerLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return holder;
|
||||||
|
}
|
||||||
|
|
||||||
|
// release the event back to the publisher
|
||||||
|
// notify publisher in case pub was waiting
|
||||||
|
public void release(MessageHolder holder) {
|
||||||
|
this.producer.set(holder);
|
||||||
|
|
||||||
|
if (this.waitingProducer) {
|
||||||
|
this.publisherLock.lock();
|
||||||
|
try {
|
||||||
|
if (this.waitingProducer) {
|
||||||
|
this.waitingProducer = false;
|
||||||
|
this.publisherNotifyCondition.signalAll();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.publisherLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,47 +0,0 @@
|
|||||||
package dorkbox.util.messagebus.subscription;
|
|
||||||
|
|
||||||
import com.lmax.disruptor.WorkHandler;
|
|
||||||
|
|
||||||
import dorkbox.util.messagebus.PubSubSupport;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author dorkbox, llc
|
|
||||||
* Date: 2/2/15
|
|
||||||
*/
|
|
||||||
public class EventProcessor implements WorkHandler<MessageHolder> {
|
|
||||||
private final PubSubSupport publisher;
|
|
||||||
|
|
||||||
public EventProcessor(PubSubSupport publisher) {
|
|
||||||
this.publisher = publisher;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onEvent(MessageHolder event) throws Exception {
|
|
||||||
MessageType messageType = event.messageType;
|
|
||||||
switch (messageType) {
|
|
||||||
case ONE: {
|
|
||||||
this.publisher.publish(event.message1);
|
|
||||||
event.message1 = null; // cleanup
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case TWO: {
|
|
||||||
this.publisher.publish(event.message1, event.message2);
|
|
||||||
event.message1 = null; // cleanup
|
|
||||||
event.message2 = null; // cleanup
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case THREE: {
|
|
||||||
this.publisher.publish(event.message1, event.message2, event.message3);
|
|
||||||
event.message1 = null; // cleanup
|
|
||||||
event.message2 = null; // cleanup
|
|
||||||
event.message3 = null; // cleanup
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case ARRAY: {
|
|
||||||
this.publisher.publish(event.messages);
|
|
||||||
event.messages = null; // cleanup
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,11 +3,10 @@
|
|||||||
*/
|
*/
|
||||||
package dorkbox.util.messagebus;
|
package dorkbox.util.messagebus;
|
||||||
|
|
||||||
import dorkbox.util.messagebus.MultiMBassador;
|
import junit.framework.Assert;
|
||||||
import dorkbox.util.messagebus.annotations.Handler;
|
import dorkbox.util.messagebus.annotations.Handler;
|
||||||
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
|
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
|
||||||
import dorkbox.util.messagebus.error.PublicationError;
|
import dorkbox.util.messagebus.error.PublicationError;
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author dorkbox, llc Date: 2/2/15
|
* @author dorkbox, llc Date: 2/2/15
|
||||||
@ -41,9 +40,10 @@ public class PerformanceTest {
|
|||||||
Listener listener1 = new Listener();
|
Listener listener1 = new Listener();
|
||||||
bus.subscribe(listener1);
|
bus.subscribe(listener1);
|
||||||
|
|
||||||
long num = Long.MAX_VALUE;
|
|
||||||
while (num-- > 0) {
|
long num = 0;
|
||||||
bus.publishAsync("s");
|
while (num < Long.MAX_VALUE) {
|
||||||
|
bus.publishAsync(num++);
|
||||||
}
|
}
|
||||||
|
|
||||||
// bus.publish("s", "s");
|
// bus.publish("s", "s");
|
||||||
@ -60,11 +60,16 @@ public class PerformanceTest {
|
|||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public static class Listener {
|
public static class Listener {
|
||||||
@Handler
|
@Handler
|
||||||
public void handleSync(String o1) {
|
public void handleSync(Long o1) {
|
||||||
count++;
|
count++;
|
||||||
// System.err.println("match String");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(String o1) {
|
||||||
|
// count++;
|
||||||
|
//// System.err.println("match String");
|
||||||
|
// }
|
||||||
|
|
||||||
// @Handler
|
// @Handler
|
||||||
// public void handleSync(String o1, String o2) {
|
// public void handleSync(String o1, String o2) {
|
||||||
// count.getAndIncrement();
|
// count.getAndIncrement();
|
||||||
|
Loading…
Reference in New Issue
Block a user