Added ability to set the total workload, then get the current progress -- without blocking

This commit is contained in:
nathan 2017-08-14 12:19:25 +02:00
parent c927093388
commit 2f9a0da32f
1 changed files with 58 additions and 7 deletions

View File

@ -18,6 +18,7 @@ package dorkbox.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -39,11 +40,19 @@ class ParallelProcessor<T extends Runnable> {
private final ArrayBlockingQueue<Object> queue; private final ArrayBlockingQueue<Object> queue;
private final CountDownLatch latch; private final CountDownLatch latch;
private int totalWorkload = 0;
private AtomicInteger currentProgress = new AtomicInteger(0);
public public
ParallelProcessor() { ParallelProcessor() {
this(OS.getOptimumNumberOfThreads(), null); this(OS.getOptimumNumberOfThreads(), null);
} }
public
ParallelProcessor(final int numberOfThreads) {
this(numberOfThreads, null);
}
public public
ParallelProcessor(final int numberOfThreads, final Logger logger) { ParallelProcessor(final int numberOfThreads, final Logger logger) {
latch = new CountDownLatch(numberOfThreads); latch = new CountDownLatch(numberOfThreads);
@ -98,6 +107,7 @@ class ParallelProcessor<T extends Runnable> {
} }
} finally { } finally {
if (taken instanceof Runnable) { if (taken instanceof Runnable) {
currentProgress.getAndIncrement();
// Return object to the pool, waking the threads that have blocked during take() // Return object to the pool, waking the threads that have blocked during take()
ParallelProcessor.this.workerPool.offer((T) taken); ParallelProcessor.this.workerPool.offer((T) taken);
} }
@ -128,10 +138,12 @@ class ParallelProcessor<T extends Runnable> {
public abstract public abstract
void workComplete(T worker); void workComplete(T worker);
/**
* Returns true if there are workers immediately available for work.
*/
public public
ParallelProcessor(final int numberOfThreads) { boolean hasAvailableWorker() {
this(numberOfThreads, null); return !this.workerPool.isEmpty();
} }
/** /**
@ -154,6 +166,19 @@ class ParallelProcessor<T extends Runnable> {
queue.put(work); queue.put(work);
} }
/**
* Notifies the threads that no more work will be queued after this is called.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public
void doneQueueingTasks() throws InterruptedException {
for (int i = 0; i < threads.size(); i++) {
// this tells out threads that we have finished work and can exit
queue.put(SENTINEL);
}
}
/** /**
* Waits for the results to finish processing. No more work should be queued after this is called. * Waits for the results to finish processing. No more work should be queued after this is called.
* *
@ -161,11 +186,37 @@ class ParallelProcessor<T extends Runnable> {
*/ */
public public
void waitUntilDone() throws InterruptedException { void waitUntilDone() throws InterruptedException {
for (int i = 0; i < threads.size(); i++) { doneQueueingTasks();
// this tells out threads that we have finished work and can exit
queue.put(SENTINEL);
}
latch.await(); latch.await();
} }
/**
* Sets the total amount of work to be performed. Also resets the count for the amount of work done.
*/
public
void setTotalWorkload(int totalWorkload) {
currentProgress.set(0);
this.totalWorkload = totalWorkload;
}
/**
* Gets the amount of progress made, between 0-1
* <p>
* If this returns 0, it is safe to call {@link ParallelProcessor#waitUntilDone()}. It will block, but only until the processing
* threads shutdown (which is quick)
*/
public
float getProgress() {
int i = currentProgress.get();
if (i == 0) {
return 0.0f;
}
if (i == totalWorkload) {
return 1.0f;
}
return (float) i / (float) totalWorkload;
}
} }