From 2f9a0da32f670acc272f4c080fc913c4ae9547fb Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 14 Aug 2017 12:19:25 +0200 Subject: [PATCH] Added ability to set the total workload, then get the current progress -- without blocking --- src/dorkbox/util/ParallelProcessor.java | 65 ++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/dorkbox/util/ParallelProcessor.java b/src/dorkbox/util/ParallelProcessor.java index 0fff83c..9bd9e54 100644 --- a/src/dorkbox/util/ParallelProcessor.java +++ b/src/dorkbox/util/ParallelProcessor.java @@ -18,6 +18,7 @@ package dorkbox.util; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -39,11 +40,19 @@ class ParallelProcessor { private final ArrayBlockingQueue queue; private final CountDownLatch latch; + private int totalWorkload = 0; + private AtomicInteger currentProgress = new AtomicInteger(0); + public ParallelProcessor() { this(OS.getOptimumNumberOfThreads(), null); } + public + ParallelProcessor(final int numberOfThreads) { + this(numberOfThreads, null); + } + public ParallelProcessor(final int numberOfThreads, final Logger logger) { latch = new CountDownLatch(numberOfThreads); @@ -98,6 +107,7 @@ class ParallelProcessor { } } finally { if (taken instanceof Runnable) { + currentProgress.getAndIncrement(); // Return object to the pool, waking the threads that have blocked during take() ParallelProcessor.this.workerPool.offer((T) taken); } @@ -128,10 +138,12 @@ class ParallelProcessor { public abstract void workComplete(T worker); - + /** + * Returns true if there are workers immediately available for work. + */ public - ParallelProcessor(final int numberOfThreads) { - this(numberOfThreads, null); + boolean hasAvailableWorker() { + return !this.workerPool.isEmpty(); } /** @@ -154,6 +166,19 @@ class ParallelProcessor { queue.put(work); } + /** + * Notifies the threads that no more work will be queued after this is called. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public + void doneQueueingTasks() throws InterruptedException { + for (int i = 0; i < threads.size(); i++) { + // this tells out threads that we have finished work and can exit + queue.put(SENTINEL); + } + } + /** * Waits for the results to finish processing. No more work should be queued after this is called. * @@ -161,11 +186,37 @@ class ParallelProcessor { */ public void waitUntilDone() throws InterruptedException { - for (int i = 0; i < threads.size(); i++) { - // this tells out threads that we have finished work and can exit - queue.put(SENTINEL); - } + doneQueueingTasks(); latch.await(); } + + /** + * Sets the total amount of work to be performed. Also resets the count for the amount of work done. + */ + public + void setTotalWorkload(int totalWorkload) { + currentProgress.set(0); + this.totalWorkload = totalWorkload; + } + + /** + * Gets the amount of progress made, between 0-1 + *

+ * If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()}. It will block, but only until the processing + * threads shutdown (which is quick) + */ + public + float getProgress() { + int i = currentProgress.get(); + + if (i == 0) { + return 0.0f; + } + if (i == totalWorkload) { + return 1.0f; + } + + return (float) i / (float) totalWorkload; + } }