diff --git a/src/dorkbox/util/MBus.java b/src/dorkbox/util/MBus.java deleted file mode 100644 index 6670a2b..0000000 --- a/src/dorkbox/util/MBus.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util; - -import dorkbox.messagebus.IMessageBus; -import dorkbox.messagebus.MessageBus; -import dorkbox.messagebus.error.IPublicationErrorHandler; -import dorkbox.messagebus.error.PublicationError; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public -class MBus { - - public static final IMessageBus bus; - private static final Logger logger = LoggerFactory.getLogger(MBus.class); - - - static { - MessageBus messageBus = new MessageBus(OS.getOptimumNumberOfThreads() * 2); - - IPublicationErrorHandler ExceptionCounter = new IPublicationErrorHandler() { - @Override - public - void handleError(PublicationError error) { - logger.error(error.toString()); - - if (error.getCause() != null) { - error.getCause() - .printStackTrace(); - } - } - - @Override - public - void handleError(final String error, final Class listenerClass) { - // Printout the error itself - logger.error(new StringBuilder().append(error) - .append(": ") - .append(listenerClass.getSimpleName()) - .toString()); - } - }; - - messageBus.addErrorHandler(ExceptionCounter); - bus = messageBus; - } - - private - MBus() { - } -} diff --git a/src/dorkbox/util/parallel/ParallelCollector.java b/src/dorkbox/util/parallel/ParallelCollector.java deleted file mode 100644 index decc492..0000000 --- a/src/dorkbox/util/parallel/ParallelCollector.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.parallel; - -public -interface ParallelCollector { - /** - * Called each time a single piece of work (a task) is completed. - */ - void taskComplete(T work); - - /** - * Called when everything is finished. - */ - void workComplete(); - -} diff --git a/src/dorkbox/util/parallel/ParallelProcessor.java b/src/dorkbox/util/parallel/ParallelProcessor.java deleted file mode 100644 index 9a8f55c..0000000 --- a/src/dorkbox/util/parallel/ParallelProcessor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.parallel; - -import dorkbox.util.NamedThreadFactory; -import dorkbox.objectPool.ObjectPool; -import dorkbox.objectPool.PoolableObject; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * This isn't the FASTEST implementation, but it is relatively easy and solid. Also minimal GC. - * @param - */ -public -class ParallelProcessor { - - private final ObjectPool pool; - private final int totalTaskCount; - private final ParallelCollector collector; - - private final ArrayList threads; - private final AtomicInteger processedCount; - private final AtomicInteger assignedCount; - - private final ArrayBlockingQueue queue; - - private volatile boolean isShuttingDown = false; - - - public - ParallelProcessor(final PoolableObject poolableObject, final int numberOfThreads, final int totalTaskCount, final Logger logger, - final ParallelCollector collector) { - - this.totalTaskCount = totalTaskCount - 1; // x-1 because our ID's start at 0, not 1 - this.collector = collector; - - pool = ObjectPool.Blocking(poolableObject, numberOfThreads); - this.processedCount = new AtomicInteger(); - this.assignedCount = new AtomicInteger(); - - queue = new ArrayBlockingQueue(numberOfThreads * 2); - threads = new ArrayList(numberOfThreads); - - ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread().getThreadGroup(), "ParallelProcessor"); - NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("Processor", threadGroup); - for (int i = 0; i < numberOfThreads; i++) { - Runnable runnable = new Runnable() { - @Override - public - void run() { - ArrayBlockingQueue queue = ParallelProcessor.this.queue; - - //noinspection InfiniteLoopStatement - while (true) { - // we want to continue, even if there is an error (until we decide to shutdown). - // we get these threads to exit via `interrupt` - try { - while (!isShuttingDown) { - final T work = queue.take(); - doWork(work); - } - } catch (Throwable t) { - if (!isShuttingDown) { - logger.error("Error during execution of parallel work!", t); - } - } - } - } - }; - - - Thread runner = dispatchThreadFactory.newThread(runnable); - this.threads.add(runner); - } - - for (Thread thread : threads) { - thread.start(); - } - } - - private - void doWork(final T work) { - // this does the work, and stores the result - work.doWork(); - collector.taskComplete(work); - - final int i = processedCount.getAndIncrement(); - pool.put(work); // last valid position for 'work', since this releases it back so the client can reuse it - - if (i == totalTaskCount) { - isShuttingDown = true; - - // whichever thread finishes, is the one that runs workComplete - collector.workComplete(); - - for (Thread thread : threads) { - thread.interrupt(); - } - } - } - - public - T next() throws InterruptedException { - final T take = pool.take(); - take.setId(assignedCount.getAndIncrement()); - return take; - } - - public - void execute(final T work) throws InterruptedException { - queue.put(work); - } -} diff --git a/src/dorkbox/util/parallel/ParallelTask.java b/src/dorkbox/util/parallel/ParallelTask.java deleted file mode 100644 index fa4d19d..0000000 --- a/src/dorkbox/util/parallel/ParallelTask.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.parallel; - -public abstract -class ParallelTask { - public int index; - - public - void setId(int index) { - this.index = index; - } - - public - int getId() { - return index; - } - - public abstract - void doWork(); -}