From 0ae980319b1d420fa4459edbe0db5b78325f9d84 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 7 Feb 2016 00:22:54 +0100 Subject: [PATCH] Cleaned up synchrony start/shutdown. Removed start (it was unnecessary), and shutdown now waits for threads to stop --- src/dorkbox/util/messagebus/MessageBus.java | 1 - .../util/messagebus/synchrony/AsyncABQ.java | 42 ++++++++++++------- .../messagebus/synchrony/AsyncABQ_noGc.java | 41 +++++++++++------- .../messagebus/synchrony/AsyncDisruptor.java | 32 +++++++------- .../util/messagebus/synchrony/Sync.java | 5 --- .../util/messagebus/synchrony/Synchrony.java | 1 - 6 files changed, 67 insertions(+), 55 deletions(-) diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 946ce80..25c61a6 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -233,7 +233,6 @@ class MessageBus implements IMessageBus { public void start() { errorHandler.init(); - asyncPublication.start(); } @Override diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java index 4205680..371eed0 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ.java @@ -22,8 +22,10 @@ import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.disruptor.MessageType; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.locks.LockSupport; /** * This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it @@ -38,6 +40,7 @@ class AsyncABQ implements Synchrony { private final ArrayBlockingQueue dispatchQueue; private final Collection threads; + private final Collection shutdown; /** * Notifies the consumers during shutdown, that it's on purpose. @@ -63,14 +66,21 @@ class AsyncABQ implements Synchrony { while (!AsyncABQ.this.shuttingDown) { process(IN_QUEUE, syncPublication1, errorHandler1); } + + synchronized (shutdown) { + shutdown.add(Boolean.TRUE); + } } }; this.threads = new ArrayDeque(numberOfThreads); + this.shutdown = new ArrayList(); + final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); for (int i = 0; i < numberOfThreads; i++) { - Thread runner = threadFactory.newThread(runnable); - this.threads.add(runner); + Thread thread = threadFactory.newThread(runnable); + this.threads.add(thread); + thread.start(); } } @@ -108,7 +118,7 @@ class AsyncABQ implements Synchrony { } } } catch (Throwable e) { - if (event != null) { + if (event != null && !this.shuttingDown) { switch (messageType) { case MessageType.ONE: { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") @@ -134,7 +144,6 @@ class AsyncABQ implements Synchrony { } } - @Override public void publish(final Subscription[] subscriptions, final Object message1) throws Throwable { @@ -174,17 +183,14 @@ class AsyncABQ implements Synchrony { this.dispatchQueue.put(take); } + @Override public - void start() { - if (shuttingDown) { - throw new Error("Unable to restart the MessageBus"); - } - - for (Thread t : this.threads) { - t.start(); - } + boolean hasPendingMessages() { + return !this.dispatchQueue.isEmpty(); } + @SuppressWarnings("Duplicates") + @Override public void shutdown() { this.shuttingDown = true; @@ -192,10 +198,14 @@ class AsyncABQ implements Synchrony { for (Thread t : this.threads) { t.interrupt(); } - } - public - boolean hasPendingMessages() { - return !this.dispatchQueue.isEmpty(); + while (true) { + synchronized (shutdown) { + if (shutdown.size() == threads.size()) { + return; + } + LockSupport.parkNanos(100L); // wait 100ms for threads to quit + } + } } } diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java index 400d48e..e4d79cd 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncABQ_noGc.java @@ -22,8 +22,10 @@ import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.synchrony.disruptor.MessageType; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.locks.LockSupport; /** * This is similar in behavior to the disruptor in that it does not generate garbage, however the downside of this implementation is it is @@ -42,6 +44,7 @@ class AsyncABQ_noGc implements Synchrony { private final ArrayBlockingQueue gcQueue; private final Collection threads; + private final Collection shutdown; /** * Notifies the consumers during shutdown, that it's on purpose. @@ -75,14 +78,21 @@ class AsyncABQ_noGc implements Synchrony { while (!AsyncABQ_noGc.this.shuttingDown) { process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1); } + + synchronized (shutdown) { + shutdown.add(Boolean.TRUE); + } } }; this.threads = new ArrayDeque(numberOfThreads); + this.shutdown = new ArrayList(); + final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus"); for (int i = 0; i < numberOfThreads; i++) { - Thread runner = threadFactory.newThread(runnable); - this.threads.add(runner); + Thread thread = threadFactory.newThread(runnable); + this.threads.add(thread); + thread.start(); } } @@ -126,7 +136,7 @@ class AsyncABQ_noGc implements Synchrony { } } } catch (Throwable e) { - if (event != null) { + if (event != null && !this.shuttingDown) { switch (messageType) { case MessageType.ONE: { errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") @@ -191,17 +201,14 @@ class AsyncABQ_noGc implements Synchrony { this.dispatchQueue.put(take); } + @Override public - void start() { - if (shuttingDown) { - throw new Error("Unable to restart the MessageBus"); - } - - for (Thread t : this.threads) { - t.start(); - } + boolean hasPendingMessages() { + return !this.dispatchQueue.isEmpty(); } + @SuppressWarnings("Duplicates") + @Override public void shutdown() { this.shuttingDown = true; @@ -209,10 +216,14 @@ class AsyncABQ_noGc implements Synchrony { for (Thread t : this.threads) { t.interrupt(); } - } - public - boolean hasPendingMessages() { - return !this.dispatchQueue.isEmpty(); + while (true) { + synchronized (shutdown) { + if (shutdown.size() == threads.size()) { + return; + } + LockSupport.parkNanos(100L); // wait 100ms for threads to quit + } + } } } diff --git a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java index 9149c97..10751b8 100644 --- a/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java +++ b/src/dorkbox/util/messagebus/synchrony/AsyncDisruptor.java @@ -166,23 +166,7 @@ class AsyncDisruptor implements Synchrony { } - public - void start() { - } - - public - void shutdown() { - for (WorkProcessor processor : workProcessors) { - processor.halt(); - } - - for (MessageHandler handler : handlers) { - while (!handler.isShutdown()) { - LockSupport.parkNanos(100L); // wait 100ms for handlers to quit - } - } - } - + @Override public boolean hasPendingMessages() { // from workerPool.drainAndHalt() @@ -196,4 +180,18 @@ class AsyncDisruptor implements Synchrony { return false; } + + @Override + public + void shutdown() { + for (WorkProcessor processor : workProcessors) { + processor.halt(); + } + + for (MessageHandler handler : handlers) { + while (!handler.isShutdown()) { + LockSupport.parkNanos(100L); // wait 100ms for handlers to quit + } + } + } } diff --git a/src/dorkbox/util/messagebus/synchrony/Sync.java b/src/dorkbox/util/messagebus/synchrony/Sync.java index 53fbaea..7be6cf6 100644 --- a/src/dorkbox/util/messagebus/synchrony/Sync.java +++ b/src/dorkbox/util/messagebus/synchrony/Sync.java @@ -51,11 +51,6 @@ class Sync implements Synchrony { } } - @Override - public - void start() { - } - @Override public void shutdown() { diff --git a/src/dorkbox/util/messagebus/synchrony/Synchrony.java b/src/dorkbox/util/messagebus/synchrony/Synchrony.java index 6de5eca..21afea5 100644 --- a/src/dorkbox/util/messagebus/synchrony/Synchrony.java +++ b/src/dorkbox/util/messagebus/synchrony/Synchrony.java @@ -26,7 +26,6 @@ interface Synchrony { void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ; void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ; - void start(); void shutdown(); boolean hasPendingMessages(); }