From 3579e86d481ebe1493c500238f074922610d5557 Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 30 Sep 2017 23:52:29 +0200 Subject: [PATCH] Updated the parallel processor to take in tasks (instead of a runnable). This make it a lot more simple/logical to use. --- src/dorkbox/util/ParallelProcessor.java | 196 +++++++++++++++--------- 1 file changed, 120 insertions(+), 76 deletions(-) diff --git a/src/dorkbox/util/ParallelProcessor.java b/src/dorkbox/util/ParallelProcessor.java index 670bfed..e9664b5 100644 --- a/src/dorkbox/util/ParallelProcessor.java +++ b/src/dorkbox/util/ParallelProcessor.java @@ -23,76 +23,136 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; /** - * A utility class to simplify processing data/work/tasks on multiple threads. + * A Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that + * creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting *

* Remember that the JMM requires that empty 'synchronize' will not be optimized out by the compiler or JIT! *

- * This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC through object pools + * This is NOT the FASTEST implementation, but it is relatively easy and solid. */ @SuppressWarnings({"unused", "WeakerAccess"}) public abstract -class ParallelProcessor { +class ParallelProcessor { + public + interface Worker { + /** + * Runs the work. + * + * @return true if there was work done, otherwise false. + */ + boolean process(Task objectToProcess); + } + private static final Object SENTINEL = new Object[0]; - private final ArrayBlockingQueue workerPool; - + private final int numberOfThreads; private final ArrayList threads; private final ArrayBlockingQueue queue; private final CountDownLatch latch; - private int totalWorkload = 0; - private AtomicInteger currentProgress = new AtomicInteger(0); + private final int totalWorkload; + private final AtomicInteger currentProgress = new AtomicInteger(0); + /** + * Creates a Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that + * creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting + *

+ * This will not track the progress from (0-1) but instead will record the total number of processed tasks + * This will use the OS optimum number of threads (based on the CPU core count) + * This will not assign a logger and errors will be printed to std.err + */ public ParallelProcessor() { - this(OS.getOptimumNumberOfThreads(), null); + this(-1, OS.getOptimumNumberOfThreads(), null); } + /** + * Creates a Parallel processor to simplify processing data on multiple threads. + *

+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so + * that memory is constrained at the expense of CPU waiting + * + * This will use the OS optimum number of threads (based on the CPU core count) + * This will not assign a logger and errors will be printed to std.err + * + * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable + */ public - ParallelProcessor(final int numberOfThreads) { - this(numberOfThreads, null); + ParallelProcessor(final int totalWorkload) { + this(totalWorkload, OS.getOptimumNumberOfThreads(), null); } + /** + * Creates a Parallel processor to simplify processing data on multiple threads. + *

+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so + * that memory is constrained at the expense of CPU waiting + * + * This will not assign a logger and errors will be printed to std.err + * + * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable + * @param numberOfThreads this is the number of threads requested to do the work + */ public - ParallelProcessor(final int numberOfThreads, final Logger logger) { - latch = new CountDownLatch(numberOfThreads); + ParallelProcessor(final int totalWorkload, final int numberOfThreads) { + this(totalWorkload, numberOfThreads, null); + } - workerPool = new ArrayBlockingQueue(numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { - T e = createWorker(); - this.workerPool.add(e); - } + /** + * Creates a Parallel processor to simplify processing data on multiple threads. + *

+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so + * that memory is constrained at the expense of CPU waiting + * + * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable + * @param numberOfThreads this is the number of threads requested to do the work + * @param logger this is the logger to report errors (can be null) + */ + public + ParallelProcessor(final int totalWorkload, final int numberOfThreads, final Logger logger) { + this.totalWorkload = totalWorkload; + this.numberOfThreads = numberOfThreads; + + latch = new CountDownLatch(this.numberOfThreads); + queue = new ArrayBlockingQueue(numberOfThreads); - queue = new ArrayBlockingQueue(numberOfThreads * 2); threads = new ArrayList(numberOfThreads); ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread() .getThreadGroup(), "ParallelProcessor"); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup); for (int i = 0; i < numberOfThreads; i++) { - Runnable runnable = new Runnable() { + java.lang.Runnable runnable = new java.lang.Runnable() { @SuppressWarnings("unchecked") @Override public void run() { - ArrayBlockingQueue queue = ParallelProcessor.this.queue; + final ParallelProcessor processor = ParallelProcessor.this; + final ArrayBlockingQueue queue = processor.queue; + final Worker worker = createWorker(); - Object taken = null; + Object taken; while (true) { // we want to continue, even if there is an error (until we decide to shutdown). try { taken = queue.take(); + // only two types, the sentinel or the work to be done if (taken == SENTINEL) { latch.countDown(); return; } + } catch (Throwable ignored) { + // this thread was interrupted. Shouldn't ever really happen. + return; + } - T work = (T) taken; + Task task = (Task) taken; - // this does the work, and stores the result - work.run(); - workComplete(work); + try { + // this does the work + worker.process(task); + workComplete(ParallelProcessor.this, task); } catch (Throwable t) { if (logger != null) { logger.error("Error during execution of work!", t); @@ -101,17 +161,18 @@ class ParallelProcessor { t.printStackTrace(); } } finally { - if (taken instanceof Runnable) { - currentProgress.getAndIncrement(); - // Return object to the pool, waking the threads that have blocked during take() - ParallelProcessor.this.workerPool.offer((T) taken); + // record how much work was done + currentProgress.getAndIncrement(); + + // notify all threads that are waiting for processing to finish + synchronized (currentProgress) { + currentProgress.notifyAll(); } } } } }; - Thread runner = dispatchThreadFactory.newThread(runnable); this.threads.add(runner); } @@ -122,89 +183,72 @@ class ParallelProcessor { } /** - * Creates a worker to be placed into the worker pool. This is only called when necessary. + * Creates a worker which will perform work. */ public abstract - T createWorker(); + Worker createWorker(); /** * Called each time a single piece of work (a task) is completed. */ public abstract - void workComplete(T worker); + void workComplete(ParallelProcessor processor, Task task); /** - * Returns true if there are workers immediately available for work. + * Returns true if there are workers immediately able to do work. */ public boolean hasAvailableWorker() { - return !this.workerPool.isEmpty(); + return this.queue.size() < numberOfThreads; } /** - * Gets the next available worker, blocks until a worker is available. + * Queues task to be worked on * - * @throws InterruptedException if the current thread is interrupted while waiting + * @throws InterruptedException if the current thread is interrupted while waiting for a worker to process the task */ public - T nextWorker() throws InterruptedException { - return this.workerPool.take(); + void process(final Task taskToProcess) throws InterruptedException { + queue.put(taskToProcess); } /** - * Queues task to be completed - * - * @throws InterruptedException if the current thread is interrupted while waiting - */ - public - void queueTask(final T work) throws InterruptedException { - queue.put(work); - } - - /** - * Notifies the threads that no more work will be queued after this is called. - * - * @throws InterruptedException if the current thread is interrupted while waiting - */ - public - void doneQueueingTasks() throws InterruptedException { - for (int i = 0; i < threads.size(); i++) { - // this tells our threads that we have finished work and can exit - queue.put(SENTINEL); - } - } - - /** - * Waits for the results to finish processing. No more work should be queued after this is called. + * Waits for the results to finish processing. No more work can be done after this is called. * * @throws InterruptedException if the current thread is interrupted while waiting */ public void waitUntilDone() throws InterruptedException { - doneQueueingTasks(); + if (totalWorkload > 0) { + while (currentProgress.get() - totalWorkload != 0) { + synchronized (currentProgress) { + currentProgress.wait(10000L); // waits 10 seconds + } + } + } + + // stop all workers. + for (int i = 0; i < threads.size(); i++) { + // this tells our threads that we have finished work and can exit + queue.put(SENTINEL); + } latch.await(); } /** - * Sets the total amount of work to be performed. Also resets the count for the amount of work done. - */ - public - void setTotalWorkload(int totalWorkload) { - currentProgress.set(0); - this.totalWorkload = totalWorkload; - } - - /** - * Gets the amount of progress made, between 0-1 + * Gets the amount of progress made, between 0-1 OR return the number of tasks completed (if called with totalWorkload = -1). *

- * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()}. It will block, but only until the processing - * threads shutdown (which is quick) + * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()} while will block until the worker threads shutdown */ public float getProgress() { int i = currentProgress.get(); + if (this.totalWorkload == -1) { + return (float) i; + } + if (i == 0) { return 0.0f; }