Cleaned up synchrony start/shutdown. Removed start (it was unnecessary), and shutdown now waits for threads to stop

This commit is contained in:
nathan 2016-02-07 00:22:54 +01:00
parent 89f47fdd8a
commit 0ae980319b
6 changed files with 67 additions and 55 deletions

View File

@ -233,7 +233,6 @@ class MessageBus implements IMessageBus {
public
void start() {
errorHandler.init();
asyncPublication.start();
}
@Override

View File

@ -22,8 +22,10 @@ import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.disruptor.MessageType;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.LockSupport;
/**
* This is similar to the disruptor, however the downside of this implementation is that, while faster than the no-gc version, it
@ -38,6 +40,7 @@ class AsyncABQ implements Synchrony {
private final ArrayBlockingQueue<MessageHolder> dispatchQueue;
private final Collection<Thread> threads;
private final Collection<Boolean> shutdown;
/**
* Notifies the consumers during shutdown, that it's on purpose.
@ -63,14 +66,21 @@ class AsyncABQ implements Synchrony {
while (!AsyncABQ.this.shuttingDown) {
process(IN_QUEUE, syncPublication1, errorHandler1);
}
synchronized (shutdown) {
shutdown.add(Boolean.TRUE);
}
}
};
this.threads = new ArrayDeque<Thread>(numberOfThreads);
this.shutdown = new ArrayList<Boolean>();
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
Thread thread = threadFactory.newThread(runnable);
this.threads.add(thread);
thread.start();
}
}
@ -108,7 +118,7 @@ class AsyncABQ implements Synchrony {
}
}
} catch (Throwable e) {
if (event != null) {
if (event != null && !this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
@ -134,7 +144,6 @@ class AsyncABQ implements Synchrony {
}
}
@Override
public
void publish(final Subscription[] subscriptions, final Object message1) throws Throwable {
@ -174,17 +183,14 @@ class AsyncABQ implements Synchrony {
this.dispatchQueue.put(take);
}
@Override
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
for (Thread t : this.threads) {
t.start();
}
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
}
@SuppressWarnings("Duplicates")
@Override
public
void shutdown() {
this.shuttingDown = true;
@ -192,10 +198,14 @@ class AsyncABQ implements Synchrony {
for (Thread t : this.threads) {
t.interrupt();
}
}
public
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
while (true) {
synchronized (shutdown) {
if (shutdown.size() == threads.size()) {
return;
}
LockSupport.parkNanos(100L); // wait 100ms for threads to quit
}
}
}
}

View File

@ -22,8 +22,10 @@ import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.synchrony.disruptor.MessageType;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.LockSupport;
/**
* This is similar in behavior to the disruptor in that it does not generate garbage, however the downside of this implementation is it is
@ -42,6 +44,7 @@ class AsyncABQ_noGc implements Synchrony {
private final ArrayBlockingQueue<MessageHolder> gcQueue;
private final Collection<Thread> threads;
private final Collection<Boolean> shutdown;
/**
* Notifies the consumers during shutdown, that it's on purpose.
@ -75,14 +78,21 @@ class AsyncABQ_noGc implements Synchrony {
while (!AsyncABQ_noGc.this.shuttingDown) {
process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1);
}
synchronized (shutdown) {
shutdown.add(Boolean.TRUE);
}
}
};
this.threads = new ArrayDeque<Thread>(numberOfThreads);
this.shutdown = new ArrayList<Boolean>();
final NamedThreadFactory threadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
Thread runner = threadFactory.newThread(runnable);
this.threads.add(runner);
Thread thread = threadFactory.newThread(runnable);
this.threads.add(thread);
thread.start();
}
}
@ -126,7 +136,7 @@ class AsyncABQ_noGc implements Synchrony {
}
}
} catch (Throwable e) {
if (event != null) {
if (event != null && !this.shuttingDown) {
switch (messageType) {
case MessageType.ONE: {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
@ -191,17 +201,14 @@ class AsyncABQ_noGc implements Synchrony {
this.dispatchQueue.put(take);
}
@Override
public
void start() {
if (shuttingDown) {
throw new Error("Unable to restart the MessageBus");
}
for (Thread t : this.threads) {
t.start();
}
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
}
@SuppressWarnings("Duplicates")
@Override
public
void shutdown() {
this.shuttingDown = true;
@ -209,10 +216,14 @@ class AsyncABQ_noGc implements Synchrony {
for (Thread t : this.threads) {
t.interrupt();
}
}
public
boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
while (true) {
synchronized (shutdown) {
if (shutdown.size() == threads.size()) {
return;
}
LockSupport.parkNanos(100L); // wait 100ms for threads to quit
}
}
}
}

View File

@ -166,23 +166,7 @@ class AsyncDisruptor implements Synchrony {
}
public
void start() {
}
public
void shutdown() {
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
}
@Override
public
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
@ -196,4 +180,18 @@ class AsyncDisruptor implements Synchrony {
return false;
}
@Override
public
void shutdown() {
for (WorkProcessor<?> processor : workProcessors) {
processor.halt();
}
for (MessageHandler handler : handlers) {
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
}
}

View File

@ -51,11 +51,6 @@ class Sync implements Synchrony {
}
}
@Override
public
void start() {
}
@Override
public
void shutdown() {

View File

@ -26,7 +26,6 @@ interface Synchrony {
void publish(final Subscription[] subscriptions, Object message1, Object message2) throws Throwable ;
void publish(final Subscription[] subscriptions, Object message1, Object message2, Object message3) throws Throwable ;
void start();
void shutdown();
boolean hasPendingMessages();
}