Converted ParallelProcessor to use ArrayBlockingQueue. It's not the fastest (doesn't need to be), but it get's the job done, and it's simple
This commit is contained in:
parent
b8a251d056
commit
a73861d8ff
@ -15,27 +15,32 @@
|
|||||||
*/
|
*/
|
||||||
package dorkbox.util.parallel;
|
package dorkbox.util.parallel;
|
||||||
|
|
||||||
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
|
import dorkbox.util.NamedThreadFactory;
|
||||||
import dorkbox.util.objectPool.ObjectPool;
|
import dorkbox.objectPool.ObjectPool;
|
||||||
import dorkbox.util.objectPool.ObjectPoolFactory;
|
import dorkbox.objectPool.PoolableObject;
|
||||||
import dorkbox.util.objectPool.PoolableObject;
|
|
||||||
import org.jctools.queues.MpmcTransferArrayQueue;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC.
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
public
|
public
|
||||||
class ParallelProcessor<T extends ParallelTask> {
|
class ParallelProcessor<T extends ParallelTask> {
|
||||||
|
|
||||||
private final MpmcTransferArrayQueue<T> mtaq;
|
|
||||||
private final ObjectPool<T> pool;
|
private final ObjectPool<T> pool;
|
||||||
private final int totalTaskCount;
|
private final int totalTaskCount;
|
||||||
private final ParallelCollector<T> collector;
|
private final ParallelCollector<T> collector;
|
||||||
|
|
||||||
private final ArrayList<Thread> threads;
|
private final ArrayList<Thread> threads;
|
||||||
private final AtomicInteger processedCount;
|
private final AtomicInteger processedCount;
|
||||||
private final AtomicInteger assignedCount;
|
private final AtomicInteger assignedCount;
|
||||||
|
|
||||||
|
private final ArrayBlockingQueue<T> queue;
|
||||||
|
|
||||||
private volatile boolean isShuttingDown = false;
|
private volatile boolean isShuttingDown = false;
|
||||||
|
|
||||||
|
|
||||||
@ -46,32 +51,34 @@ class ParallelProcessor<T extends ParallelTask> {
|
|||||||
this.totalTaskCount = totalTaskCount - 1; // x-1 because our ID's start at 0, not 1
|
this.totalTaskCount = totalTaskCount - 1; // x-1 because our ID's start at 0, not 1
|
||||||
this.collector = collector;
|
this.collector = collector;
|
||||||
|
|
||||||
mtaq = new MpmcTransferArrayQueue<T>(numberOfThreads);
|
pool = new ObjectPool<T>(poolableObject, numberOfThreads);
|
||||||
pool = ObjectPoolFactory.create(poolableObject, numberOfThreads);
|
|
||||||
this.processedCount = new AtomicInteger();
|
this.processedCount = new AtomicInteger();
|
||||||
this.assignedCount = new AtomicInteger();
|
this.assignedCount = new AtomicInteger();
|
||||||
|
|
||||||
|
queue = new ArrayBlockingQueue<T>(numberOfThreads * 2);
|
||||||
threads = new ArrayList<Thread>(numberOfThreads);
|
threads = new ArrayList<Thread>(numberOfThreads);
|
||||||
|
|
||||||
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("ParallelProcessor");
|
ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread().getThreadGroup(), "ParallelProcessor");
|
||||||
|
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup);
|
||||||
for (int i = 0; i < numberOfThreads; i++) {
|
for (int i = 0; i < numberOfThreads; i++) {
|
||||||
Runnable runnable = new Runnable() {
|
Runnable runnable = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
void run() {
|
void run() {
|
||||||
MpmcTransferArrayQueue<T> IN_QUEUE = ParallelProcessor.this.mtaq;
|
ArrayBlockingQueue<T> queue = ParallelProcessor.this.queue;
|
||||||
|
|
||||||
//noinspection InfiniteLoopStatement
|
//noinspection InfiniteLoopStatement
|
||||||
while (true) {
|
while (true) {
|
||||||
// we want to continue, even if there is an error (until we decide to shutdown)
|
// we want to continue, even if there is an error (until we decide to shutdown).
|
||||||
|
// we get these threads to exit via `interrupt`
|
||||||
try {
|
try {
|
||||||
while (!isShuttingDown) {
|
while (!isShuttingDown) {
|
||||||
final T work = IN_QUEUE.take();
|
final T work = queue.take();
|
||||||
doWork(work);
|
doWork(work);
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (!isShuttingDown) {
|
if (!isShuttingDown) {
|
||||||
logger.error("Parallel task error!", t);
|
logger.error("Error during execution of parallel work!", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -83,9 +90,9 @@ class ParallelProcessor<T extends ParallelTask> {
|
|||||||
this.threads.add(runner);
|
this.threads.add(runner);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for (Thread thread : threads) {
|
for (Thread thread : threads) {
|
||||||
// thread.start();
|
thread.start();
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private
|
private
|
||||||
@ -99,11 +106,13 @@ class ParallelProcessor<T extends ParallelTask> {
|
|||||||
|
|
||||||
if (i == totalTaskCount) {
|
if (i == totalTaskCount) {
|
||||||
isShuttingDown = true;
|
isShuttingDown = true;
|
||||||
|
|
||||||
|
// whichever thread finishes, is the one that runs workComplete
|
||||||
|
collector.workComplete();
|
||||||
|
|
||||||
for (Thread thread : threads) {
|
for (Thread thread : threads) {
|
||||||
thread.interrupt();
|
thread.interrupt();
|
||||||
}
|
}
|
||||||
// whichever thread finishes, is the one that runs workComplete
|
|
||||||
collector.workComplete();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,6 +125,6 @@ class ParallelProcessor<T extends ParallelTask> {
|
|||||||
|
|
||||||
public
|
public
|
||||||
void execute(final T work) throws InterruptedException {
|
void execute(final T work) throws InterruptedException {
|
||||||
mtaq.transfer(work);
|
queue.put(work);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user