Updated the parallel processor to take in tasks (instead of a runnable). This make it a lot more simple/logical to use.

This commit is contained in:
nathan 2017-09-30 23:52:29 +02:00
parent fb246ee164
commit 3579e86d48
1 changed files with 120 additions and 76 deletions

View File

@ -23,76 +23,136 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger; import org.slf4j.Logger;
/** /**
* A utility class to simplify processing data/work/tasks on multiple threads. * A Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that
* creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting
* <p> * <p>
* Remember that the JMM requires that empty 'synchronize' will not be optimized out by the compiler or JIT! * Remember that the JMM requires that empty 'synchronize' will not be optimized out by the compiler or JIT!
* <p> * <p>
* This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC through object pools * This is NOT the FASTEST implementation, but it is relatively easy and solid.
*/ */
@SuppressWarnings({"unused", "WeakerAccess"}) @SuppressWarnings({"unused", "WeakerAccess"})
public abstract public abstract
class ParallelProcessor<T extends Runnable> { class ParallelProcessor<Task> {
public
interface Worker<Task> {
/**
* Runs the work.
*
* @return true if there was work done, otherwise false.
*/
boolean process(Task objectToProcess);
}
private static final Object SENTINEL = new Object[0]; private static final Object SENTINEL = new Object[0];
private final ArrayBlockingQueue<T> workerPool; private final int numberOfThreads;
private final ArrayList<Thread> threads; private final ArrayList<Thread> threads;
private final ArrayBlockingQueue<Object> queue; private final ArrayBlockingQueue<Object> queue;
private final CountDownLatch latch; private final CountDownLatch latch;
private int totalWorkload = 0; private final int totalWorkload;
private AtomicInteger currentProgress = new AtomicInteger(0); private final AtomicInteger currentProgress = new AtomicInteger(0);
/**
* Creates a Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that
* creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting
* <p>
* This will not track the progress from (0-1) but instead will record the total number of processed tasks
* This will use the OS optimum number of threads (based on the CPU core count)
* This will not assign a logger and errors will be printed to std.err
*/
public public
ParallelProcessor() { ParallelProcessor() {
this(OS.getOptimumNumberOfThreads(), null); this(-1, OS.getOptimumNumberOfThreads(), null);
} }
/**
* Creates a Parallel processor to simplify processing data on multiple threads.
* <p>
* This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
* that memory is constrained at the expense of CPU waiting
*
* This will use the OS optimum number of threads (based on the CPU core count)
* This will not assign a logger and errors will be printed to std.err
*
* @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
*/
public public
ParallelProcessor(final int numberOfThreads) { ParallelProcessor(final int totalWorkload) {
this(numberOfThreads, null); this(totalWorkload, OS.getOptimumNumberOfThreads(), null);
} }
/**
* Creates a Parallel processor to simplify processing data on multiple threads.
* <p>
* This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
* that memory is constrained at the expense of CPU waiting
*
* This will not assign a logger and errors will be printed to std.err
*
* @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
* @param numberOfThreads this is the number of threads requested to do the work
*/
public public
ParallelProcessor(final int numberOfThreads, final Logger logger) { ParallelProcessor(final int totalWorkload, final int numberOfThreads) {
latch = new CountDownLatch(numberOfThreads); this(totalWorkload, numberOfThreads, null);
workerPool = new ArrayBlockingQueue<T>(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
T e = createWorker();
this.workerPool.add(e);
} }
queue = new ArrayBlockingQueue<Object>(numberOfThreads * 2); /**
* Creates a Parallel processor to simplify processing data on multiple threads.
* <p>
* This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
* that memory is constrained at the expense of CPU waiting
*
* @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
* @param numberOfThreads this is the number of threads requested to do the work
* @param logger this is the logger to report errors (can be null)
*/
public
ParallelProcessor(final int totalWorkload, final int numberOfThreads, final Logger logger) {
this.totalWorkload = totalWorkload;
this.numberOfThreads = numberOfThreads;
latch = new CountDownLatch(this.numberOfThreads);
queue = new ArrayBlockingQueue<Object>(numberOfThreads);
threads = new ArrayList<Thread>(numberOfThreads); threads = new ArrayList<Thread>(numberOfThreads);
ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread() ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread()
.getThreadGroup(), "ParallelProcessor"); .getThreadGroup(), "ParallelProcessor");
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup);
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
Runnable runnable = new Runnable() { java.lang.Runnable runnable = new java.lang.Runnable() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public public
void run() { void run() {
ArrayBlockingQueue<Object> queue = ParallelProcessor.this.queue; final ParallelProcessor<Task> processor = ParallelProcessor.this;
final ArrayBlockingQueue<Object> queue = processor.queue;
final Worker worker = createWorker();
Object taken = null; Object taken;
while (true) { while (true) {
// we want to continue, even if there is an error (until we decide to shutdown). // we want to continue, even if there is an error (until we decide to shutdown).
try { try {
taken = queue.take(); taken = queue.take();
// only two types, the sentinel or the work to be done // only two types, the sentinel or the work to be done
if (taken == SENTINEL) { if (taken == SENTINEL) {
latch.countDown(); latch.countDown();
return; return;
} }
} catch (Throwable ignored) {
// this thread was interrupted. Shouldn't ever really happen.
return;
}
T work = (T) taken; Task task = (Task) taken;
// this does the work, and stores the result try {
work.run(); // this does the work
workComplete(work); worker.process(task);
workComplete(ParallelProcessor.this, task);
} catch (Throwable t) { } catch (Throwable t) {
if (logger != null) { if (logger != null) {
logger.error("Error during execution of work!", t); logger.error("Error during execution of work!", t);
@ -101,17 +161,18 @@ class ParallelProcessor<T extends Runnable> {
t.printStackTrace(); t.printStackTrace();
} }
} finally { } finally {
if (taken instanceof Runnable) { // record how much work was done
currentProgress.getAndIncrement(); currentProgress.getAndIncrement();
// Return object to the pool, waking the threads that have blocked during take()
ParallelProcessor.this.workerPool.offer((T) taken); // notify all threads that are waiting for processing to finish
synchronized (currentProgress) {
currentProgress.notifyAll();
} }
} }
} }
} }
}; };
Thread runner = dispatchThreadFactory.newThread(runnable); Thread runner = dispatchThreadFactory.newThread(runnable);
this.threads.add(runner); this.threads.add(runner);
} }
@ -122,89 +183,72 @@ class ParallelProcessor<T extends Runnable> {
} }
/** /**
* Creates a worker to be placed into the worker pool. This is only called when necessary. * Creates a worker which will perform work.
*/ */
public abstract public abstract
T createWorker(); Worker createWorker();
/** /**
* Called each time a single piece of work (a task) is completed. * Called each time a single piece of work (a task) is completed.
*/ */
public abstract public abstract
void workComplete(T worker); void workComplete(ParallelProcessor processor, Task task);
/** /**
* Returns true if there are workers immediately available for work. * Returns true if there are workers immediately able to do work.
*/ */
public public
boolean hasAvailableWorker() { boolean hasAvailableWorker() {
return !this.workerPool.isEmpty(); return this.queue.size() < numberOfThreads;
} }
/** /**
* Gets the next available worker, blocks until a worker is available. * Queues task to be worked on
* *
* @throws InterruptedException if the current thread is interrupted while waiting * @throws InterruptedException if the current thread is interrupted while waiting for a worker to process the task
*/ */
public public
T nextWorker() throws InterruptedException { void process(final Task taskToProcess) throws InterruptedException {
return this.workerPool.take(); queue.put(taskToProcess);
} }
/** /**
* Queues task to be completed * Waits for the results to finish processing. No more work can be done after this is called.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public
void queueTask(final T work) throws InterruptedException {
queue.put(work);
}
/**
* Notifies the threads that no more work will be queued after this is called.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public
void doneQueueingTasks() throws InterruptedException {
for (int i = 0; i < threads.size(); i++) {
// this tells our threads that we have finished work and can exit
queue.put(SENTINEL);
}
}
/**
* Waits for the results to finish processing. No more work should be queued after this is called.
* *
* @throws InterruptedException if the current thread is interrupted while waiting * @throws InterruptedException if the current thread is interrupted while waiting
*/ */
public public
void waitUntilDone() throws InterruptedException { void waitUntilDone() throws InterruptedException {
doneQueueingTasks(); if (totalWorkload > 0) {
while (currentProgress.get() - totalWorkload != 0) {
synchronized (currentProgress) {
currentProgress.wait(10000L); // waits 10 seconds
}
}
}
// stop all workers.
for (int i = 0; i < threads.size(); i++) {
// this tells our threads that we have finished work and can exit
queue.put(SENTINEL);
}
latch.await(); latch.await();
} }
/** /**
* Sets the total amount of work to be performed. Also resets the count for the amount of work done. * Gets the amount of progress made, between 0-1 OR return the number of tasks completed (if called with totalWorkload = -1).
*/
public
void setTotalWorkload(int totalWorkload) {
currentProgress.set(0);
this.totalWorkload = totalWorkload;
}
/**
* Gets the amount of progress made, between 0-1
* <p> * <p>
* If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()}. It will block, but only until the processing * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()} while will block until the worker threads shutdown
* threads shutdown (which is quick)
*/ */
public public
float getProgress() { float getProgress() {
int i = currentProgress.get(); int i = currentProgress.get();
if (this.totalWorkload == -1) {
return (float) i;
}
if (i == 0) { if (i == 0) {
return 0.0f; return 0.0f;
} }