diff --git a/src/dorkbox/util/ParallelProcessor.java b/src/dorkbox/util/ParallelProcessor.java
index 670bfed..e9664b5 100644
--- a/src/dorkbox/util/ParallelProcessor.java
+++ b/src/dorkbox/util/ParallelProcessor.java
@@ -23,76 +23,136 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
/**
- * A utility class to simplify processing data/work/tasks on multiple threads.
+ * A Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that
+ * creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting
*
* Remember that the JMM requires that empty 'synchronize' will not be optimized out by the compiler or JIT!
*
- * This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC through object pools
+ * This is NOT the FASTEST implementation, but it is relatively easy and solid.
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public abstract
-class ParallelProcessor {
+class ParallelProcessor {
+ public
+ interface Worker {
+ /**
+ * Runs the work.
+ *
+ * @return true if there was work done, otherwise false.
+ */
+ boolean process(Task objectToProcess);
+ }
+
private static final Object SENTINEL = new Object[0];
- private final ArrayBlockingQueue workerPool;
-
+ private final int numberOfThreads;
private final ArrayList threads;
private final ArrayBlockingQueue queue;
private final CountDownLatch latch;
- private int totalWorkload = 0;
- private AtomicInteger currentProgress = new AtomicInteger(0);
+ private final int totalWorkload;
+ private final AtomicInteger currentProgress = new AtomicInteger(0);
+ /**
+ * Creates a Parallel processor to simplify processing data on multiple threads and provide back-pressure to the main thread (that
+ * creates the processor and adds work to it), so that memory is constrained at the expense of CPU waiting
+ *
+ * This will not track the progress from (0-1) but instead will record the total number of processed tasks
+ * This will use the OS optimum number of threads (based on the CPU core count)
+ * This will not assign a logger and errors will be printed to std.err
+ */
public
ParallelProcessor() {
- this(OS.getOptimumNumberOfThreads(), null);
+ this(-1, OS.getOptimumNumberOfThreads(), null);
}
+ /**
+ * Creates a Parallel processor to simplify processing data on multiple threads.
+ *
+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
+ * that memory is constrained at the expense of CPU waiting
+ *
+ * This will use the OS optimum number of threads (based on the CPU core count)
+ * This will not assign a logger and errors will be printed to std.err
+ *
+ * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
+ */
public
- ParallelProcessor(final int numberOfThreads) {
- this(numberOfThreads, null);
+ ParallelProcessor(final int totalWorkload) {
+ this(totalWorkload, OS.getOptimumNumberOfThreads(), null);
}
+ /**
+ * Creates a Parallel processor to simplify processing data on multiple threads.
+ *
+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
+ * that memory is constrained at the expense of CPU waiting
+ *
+ * This will not assign a logger and errors will be printed to std.err
+ *
+ * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
+ * @param numberOfThreads this is the number of threads requested to do the work
+ */
public
- ParallelProcessor(final int numberOfThreads, final Logger logger) {
- latch = new CountDownLatch(numberOfThreads);
+ ParallelProcessor(final int totalWorkload, final int numberOfThreads) {
+ this(totalWorkload, numberOfThreads, null);
+ }
- workerPool = new ArrayBlockingQueue(numberOfThreads);
- for (int i = 0; i < numberOfThreads; i++) {
- T e = createWorker();
- this.workerPool.add(e);
- }
+ /**
+ * Creates a Parallel processor to simplify processing data on multiple threads.
+ *
+ * This implementation will provide back-pressure to the main thread (that creates the processor and adds work to it), so
+ * that memory is constrained at the expense of CPU waiting
+ *
+ * @param totalWorkload this is the total number of elements that need to be processed. -1 to disable
+ * @param numberOfThreads this is the number of threads requested to do the work
+ * @param logger this is the logger to report errors (can be null)
+ */
+ public
+ ParallelProcessor(final int totalWorkload, final int numberOfThreads, final Logger logger) {
+ this.totalWorkload = totalWorkload;
+ this.numberOfThreads = numberOfThreads;
+
+ latch = new CountDownLatch(this.numberOfThreads);
+ queue = new ArrayBlockingQueue(numberOfThreads);
- queue = new ArrayBlockingQueue(numberOfThreads * 2);
threads = new ArrayList(numberOfThreads);
ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread()
.getThreadGroup(), "ParallelProcessor");
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup);
for (int i = 0; i < numberOfThreads; i++) {
- Runnable runnable = new Runnable() {
+ java.lang.Runnable runnable = new java.lang.Runnable() {
@SuppressWarnings("unchecked")
@Override
public
void run() {
- ArrayBlockingQueue queue = ParallelProcessor.this.queue;
+ final ParallelProcessor processor = ParallelProcessor.this;
+ final ArrayBlockingQueue queue = processor.queue;
+ final Worker worker = createWorker();
- Object taken = null;
+ Object taken;
while (true) {
// we want to continue, even if there is an error (until we decide to shutdown).
try {
taken = queue.take();
+
// only two types, the sentinel or the work to be done
if (taken == SENTINEL) {
latch.countDown();
return;
}
+ } catch (Throwable ignored) {
+ // this thread was interrupted. Shouldn't ever really happen.
+ return;
+ }
- T work = (T) taken;
+ Task task = (Task) taken;
- // this does the work, and stores the result
- work.run();
- workComplete(work);
+ try {
+ // this does the work
+ worker.process(task);
+ workComplete(ParallelProcessor.this, task);
} catch (Throwable t) {
if (logger != null) {
logger.error("Error during execution of work!", t);
@@ -101,17 +161,18 @@ class ParallelProcessor {
t.printStackTrace();
}
} finally {
- if (taken instanceof Runnable) {
- currentProgress.getAndIncrement();
- // Return object to the pool, waking the threads that have blocked during take()
- ParallelProcessor.this.workerPool.offer((T) taken);
+ // record how much work was done
+ currentProgress.getAndIncrement();
+
+ // notify all threads that are waiting for processing to finish
+ synchronized (currentProgress) {
+ currentProgress.notifyAll();
}
}
}
}
};
-
Thread runner = dispatchThreadFactory.newThread(runnable);
this.threads.add(runner);
}
@@ -122,89 +183,72 @@ class ParallelProcessor {
}
/**
- * Creates a worker to be placed into the worker pool. This is only called when necessary.
+ * Creates a worker which will perform work.
*/
public abstract
- T createWorker();
+ Worker createWorker();
/**
* Called each time a single piece of work (a task) is completed.
*/
public abstract
- void workComplete(T worker);
+ void workComplete(ParallelProcessor processor, Task task);
/**
- * Returns true if there are workers immediately available for work.
+ * Returns true if there are workers immediately able to do work.
*/
public
boolean hasAvailableWorker() {
- return !this.workerPool.isEmpty();
+ return this.queue.size() < numberOfThreads;
}
/**
- * Gets the next available worker, blocks until a worker is available.
+ * Queues task to be worked on
*
- * @throws InterruptedException if the current thread is interrupted while waiting
+ * @throws InterruptedException if the current thread is interrupted while waiting for a worker to process the task
*/
public
- T nextWorker() throws InterruptedException {
- return this.workerPool.take();
+ void process(final Task taskToProcess) throws InterruptedException {
+ queue.put(taskToProcess);
}
/**
- * Queues task to be completed
- *
- * @throws InterruptedException if the current thread is interrupted while waiting
- */
- public
- void queueTask(final T work) throws InterruptedException {
- queue.put(work);
- }
-
- /**
- * Notifies the threads that no more work will be queued after this is called.
- *
- * @throws InterruptedException if the current thread is interrupted while waiting
- */
- public
- void doneQueueingTasks() throws InterruptedException {
- for (int i = 0; i < threads.size(); i++) {
- // this tells our threads that we have finished work and can exit
- queue.put(SENTINEL);
- }
- }
-
- /**
- * Waits for the results to finish processing. No more work should be queued after this is called.
+ * Waits for the results to finish processing. No more work can be done after this is called.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public
void waitUntilDone() throws InterruptedException {
- doneQueueingTasks();
+ if (totalWorkload > 0) {
+ while (currentProgress.get() - totalWorkload != 0) {
+ synchronized (currentProgress) {
+ currentProgress.wait(10000L); // waits 10 seconds
+ }
+ }
+ }
+
+ // stop all workers.
+ for (int i = 0; i < threads.size(); i++) {
+ // this tells our threads that we have finished work and can exit
+ queue.put(SENTINEL);
+ }
latch.await();
}
/**
- * Sets the total amount of work to be performed. Also resets the count for the amount of work done.
- */
- public
- void setTotalWorkload(int totalWorkload) {
- currentProgress.set(0);
- this.totalWorkload = totalWorkload;
- }
-
- /**
- * Gets the amount of progress made, between 0-1
+ * Gets the amount of progress made, between 0-1 OR return the number of tasks completed (if called with totalWorkload = -1).
*
- * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()}. It will block, but only until the processing
- * threads shutdown (which is quick)
+ * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()} while will block until the worker threads shutdown
*/
public
float getProgress() {
int i = currentProgress.get();
+ if (this.totalWorkload == -1) {
+ return (float) i;
+ }
+
if (i == 0) {
return 0.0f;
}