From ea07b7a48ed236cd635a9d9a4a368c83d75b2d3e Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 28 Jun 2015 13:00:03 +0200 Subject: [PATCH] Added parallel work utilities --- .../util/parallel/ParallelCollector.java | 18 +++ .../util/parallel/ParallelProcessor.java | 109 ++++++++++++++++++ .../dorkbox/util/parallel/ParallelTask.java | 19 +++ 3 files changed, 146 insertions(+) create mode 100644 Dorkbox-Util/src/dorkbox/util/parallel/ParallelCollector.java create mode 100644 Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java create mode 100644 Dorkbox-Util/src/dorkbox/util/parallel/ParallelTask.java diff --git a/Dorkbox-Util/src/dorkbox/util/parallel/ParallelCollector.java b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelCollector.java new file mode 100644 index 0000000..d1d8db2 --- /dev/null +++ b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelCollector.java @@ -0,0 +1,18 @@ +package dorkbox.util.parallel; + +/** + * + */ +public +interface ParallelCollector { + /** + * Called each time a single piece of work (a task) is completed. + */ + void taskComplete(T work); + + /** + * Called when everything is finished. + */ + void workComplete(); + +} diff --git a/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java new file mode 100644 index 0000000..28f61dd --- /dev/null +++ b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelProcessor.java @@ -0,0 +1,109 @@ +package dorkbox.util.parallel; + +import dorkbox.util.messagebus.common.thread.NamedThreadFactory; +import dorkbox.util.objectPool.ObjectPool; +import dorkbox.util.objectPool.ObjectPoolFactory; +import dorkbox.util.objectPool.PoolableObject; +import org.jctools.queues.MpmcTransferArrayQueue; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + */ +public +class ParallelProcessor { + + private final MpmcTransferArrayQueue mtaq; + private final ObjectPool pool; + private final int totalTaskCount; + private final ParallelCollector collector; + private final ArrayList threads; + private final AtomicInteger processedCount; + private final AtomicInteger assignedCount; + + private volatile boolean isShuttingDown = false; + + + public + ParallelProcessor(final PoolableObject poolableObject, final int numberOfThreads, final int totalTaskCount, final Logger logger, + final ParallelCollector collector) { + + this.totalTaskCount = totalTaskCount - 1; // x-1 because our ID's start at 0, not 1 + this.collector = collector; + + mtaq = new MpmcTransferArrayQueue(numberOfThreads); + pool = ObjectPoolFactory.create(poolableObject, numberOfThreads); + this.processedCount = new AtomicInteger(); + this.assignedCount = new AtomicInteger(); + + threads = new ArrayList(numberOfThreads); + + NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("ParallelProcessor"); + for (int i = 0; i < numberOfThreads; i++) { + Runnable runnable = new Runnable() { + @Override + public + void run() { + MpmcTransferArrayQueue IN_QUEUE = ParallelProcessor.this.mtaq; + + //noinspection InfiniteLoopStatement + while (true) { + // we want to continue, even if there is an error (until we decide to shutdown) + try { + while (!isShuttingDown) { + final T work = IN_QUEUE.take(); + doWork(work); + } + } catch (Throwable t) { + if (!isShuttingDown) { + logger.error("Parallel task error!", t); + } + } + } + } + }; + + + Thread runner = dispatchThreadFactory.newThread(runnable); + this.threads.add(runner); + } + +// for (Thread thread : threads) { +// thread.start(); +// } + } + + private + void doWork(final T work) { + // this does the work, and stores the result + work.doWork(); + collector.taskComplete(work); + + final int i = processedCount.getAndIncrement(); + pool.release(work); // last valid position for 'work', since this releases it back so the client can reuse it + + if (i == totalTaskCount) { + isShuttingDown = true; + for (Thread thread : threads) { + thread.interrupt(); + } + // whichever thread finishes, is the one that runs workComplete + collector.workComplete(); + } + } + + public + T next() throws InterruptedException { + final T take = pool.take(); + take.setId(assignedCount.getAndIncrement()); + return take; + } + + public + void execute(final T work) throws InterruptedException { + mtaq.transfer(work); + } +} diff --git a/Dorkbox-Util/src/dorkbox/util/parallel/ParallelTask.java b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelTask.java new file mode 100644 index 0000000..d0f4980 --- /dev/null +++ b/Dorkbox-Util/src/dorkbox/util/parallel/ParallelTask.java @@ -0,0 +1,19 @@ +package dorkbox.util.parallel; + +public abstract +class ParallelTask { + public int index; + + public + void setId(int index) { + this.index = index; + } + + public + int getId() { + return index; + } + + public abstract + void doWork(); +}