diff --git a/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java index 840252d..c134b05 100644 --- a/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java +++ b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java @@ -15,27 +15,32 @@ */ package dorkbox.util.parallel; -import dorkbox.util.messagebus.common.thread.NamedThreadFactory; -import dorkbox.util.objectPool.ObjectPool; -import dorkbox.util.objectPool.ObjectPoolFactory; -import dorkbox.util.objectPool.PoolableObject; -import org.jctools.queues.MpmcTransferArrayQueue; +import dorkbox.util.NamedThreadFactory; +import dorkbox.objectPool.ObjectPool; +import dorkbox.objectPool.PoolableObject; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +/** + * This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC. + * @param + */ public class ParallelProcessor { - private final MpmcTransferArrayQueue mtaq; private final ObjectPool pool; private final int totalTaskCount; private final ParallelCollector collector; + private final ArrayList threads; private final AtomicInteger processedCount; private final AtomicInteger assignedCount; + private final ArrayBlockingQueue queue; + private volatile boolean isShuttingDown = false; @@ -46,32 +51,34 @@ class ParallelProcessor { this.totalTaskCount = totalTaskCount - 1; // x-1 because our ID's start at 0, not 1 this.collector = collector; - mtaq = new MpmcTransferArrayQueue(numberOfThreads); - pool = ObjectPoolFactory.create(poolableObject, numberOfThreads); + pool = new ObjectPool(poolableObject, numberOfThreads); this.processedCount = new AtomicInteger(); this.assignedCount = new AtomicInteger(); + queue = new ArrayBlockingQueue(numberOfThreads * 2); threads = new ArrayList(numberOfThreads); - NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("ParallelProcessor"); + ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread().getThreadGroup(), "ParallelProcessor"); + NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup); for (int i = 0; i < numberOfThreads; i++) { Runnable runnable = new Runnable() { @Override public void run() { - MpmcTransferArrayQueue IN_QUEUE = ParallelProcessor.this.mtaq; + ArrayBlockingQueue queue = ParallelProcessor.this.queue; //noinspection InfiniteLoopStatement while (true) { - // we want to continue, even if there is an error (until we decide to shutdown) + // we want to continue, even if there is an error (until we decide to shutdown). + // we get these threads to exit via `interrupt` try { while (!isShuttingDown) { - final T work = IN_QUEUE.take(); + final T work = queue.take(); doWork(work); } } catch (Throwable t) { if (!isShuttingDown) { - logger.error("Parallel task error!", t); + logger.error("Error during execution of parallel work!", t); } } } @@ -83,9 +90,9 @@ class ParallelProcessor { this.threads.add(runner); } -// for (Thread thread : threads) { -// thread.start(); -// } + for (Thread thread : threads) { + thread.start(); + } } private @@ -99,11 +106,13 @@ class ParallelProcessor { if (i == totalTaskCount) { isShuttingDown = true; + + // whichever thread finishes, is the one that runs workComplete + collector.workComplete(); + for (Thread thread : threads) { thread.interrupt(); } - // whichever thread finishes, is the one that runs workComplete - collector.workComplete(); } } @@ -116,6 +125,6 @@ class ParallelProcessor { public void execute(final T work) throws InterruptedException { - mtaq.transfer(work); + queue.put(work); } }