From 320b7ea7385ba604f9a9364eef9c752b24d21b81 Mon Sep 17 00:00:00 2001 From: Robinson Date: Fri, 9 Apr 2021 20:19:23 +0200 Subject: [PATCH] Correct check for more messages in LMAX --- .../messageBus/publication/LmaxDisruptor.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/dorkbox/messageBus/publication/LmaxDisruptor.java b/src/dorkbox/messageBus/publication/LmaxDisruptor.java index bfe9dc2..7d579d4 100644 --- a/src/dorkbox/messageBus/publication/LmaxDisruptor.java +++ b/src/dorkbox/messageBus/publication/LmaxDisruptor.java @@ -31,6 +31,7 @@ import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkProcessor; +import com.lmax.disruptor.util.Util; import dorkbox.messageBus.error.ErrorHandler; import dorkbox.messageBus.publication.disruptor.EventBusFactory; @@ -303,20 +304,14 @@ class LmaxDisruptor implements Publisher { ringBuffer.publish(seq); } + @Override public boolean hasPendingMessages() { - // from workerPool.drainAndHalt() - Sequence[] workerSequences = getSequences(); - final long cursor = ringBuffer.getCursor(); - for (Sequence s : workerSequences) { - if (cursor > s.get()) { - return true; - } - } - - return false; + // modified from workerPool.drainAndHalt() + return ringBuffer.getCursor() > Util.getMinimumSequence(getSequences()); } + @Override public void shutdown() { // let some messages finish publishing