diff --git a/src/dorkbox/objectPool/ObjectPool.kt b/src/dorkbox/objectPool/ObjectPool.kt index 32c2d7f..3ed215d 100644 --- a/src/dorkbox/objectPool/ObjectPool.kt +++ b/src/dorkbox/objectPool/ObjectPool.kt @@ -17,11 +17,13 @@ package dorkbox.objectPool import com.conversantmedia.util.concurrent.DisruptorBlockingQueue import dorkbox.objectPool.blocking.BlockingPool +import dorkbox.objectPool.blocking.BlockingPoolCollection import dorkbox.objectPool.nonBlocking.BoundedNonBlockingPool import dorkbox.objectPool.nonBlocking.NonBlockingPool import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool import dorkbox.objectPool.suspending.ChannelQueue import dorkbox.objectPool.suspending.SuspendingPool +import dorkbox.objectPool.suspending.SuspendingPoolCollection import dorkbox.objectPool.suspending.SuspendingQueue import java.lang.ref.SoftReference import java.util.* @@ -70,7 +72,6 @@ object ObjectPool { return SuspendingPool(poolObject, size, queue) } - /** * Creates a high-performance blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a * [Pool.take] will wait for a corresponding [Pool.put]. @@ -201,4 +202,66 @@ object ObjectPool { fun nonBlockingBounded(poolObject: BoundedPoolObject, maxSize: Int, queue: Queue): Pool { return BoundedNonBlockingPool(poolObject, maxSize, queue) } + + + + + + + + + + /** + * Creates a suspending pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a + * [Pool.take] will wait for a corresponding [Pool.put]. + * + * @param poolObject controls the lifecycle of the pooled objects. + * @param size the size of the pool to create + * @param the type of object used in the pool + * + * @return a suspending pool using the kotlin Channel implementation of a specific size + */ + fun suspending(collection: Collection): dorkbox.objectPool.SuspendingPool { + return suspending(ChannelQueue(collection.size), collection) + } + + /** + * Creates a suspending pool of an existing collection, where the entire pool is initially filled, and when the pool is empty, a + * [Pool.take] will wait for a corresponding [Pool.put]. + * + * @param collection the existing collection to convert to a pool + * @param the type of object used in the pool + * + * @return a suspending pool using the kotlin Channel implementation of a specific size + */ + fun suspending(queue: SuspendingQueue, collection: Collection): dorkbox.objectPool.SuspendingPool { + return SuspendingPoolCollection(queue, collection) + } + + /** + * Creates a high-performance blocking pool of an existing collection, where the entire pool is initially filled, and when the pool is empty, a + * [Pool.take] will wait for a corresponding [Pool.put]. + * + * @param collection the existing collection to convert to a pool + * @param the type of object used in the pool + * + * @return a blocking pool using the DisruptorBlockingQueue implementation of a specific size + */ + fun blocking(collection: Collection): Pool { + return blocking(DisruptorBlockingQueue(collection.size), collection) + } + + /** + * Creates a blocking pool from an existing collection, where the entire pool is initially filled, and when the pool is empty, a + * [Pool.take] will wait for a corresponding [Pool.put]. + * + * @param queue the blocking queue implementation to use + * @param collection the existing collection to convert to a pool + * @param the type of object used in the pool + * + * @return a blocking pool using the specified [BlockingQueue] implementation of a specific size + */ + fun blocking(queue: BlockingQueue, collection: Collection): Pool { + return BlockingPoolCollection(queue, collection) + } } diff --git a/src/dorkbox/objectPool/blocking/BlockingPoolCollection.kt b/src/dorkbox/objectPool/blocking/BlockingPoolCollection.kt new file mode 100644 index 0000000..4159e87 --- /dev/null +++ b/src/dorkbox/objectPool/blocking/BlockingPoolCollection.kt @@ -0,0 +1,76 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.objectPool.blocking + +import dorkbox.objectPool.Pool +import java.util.concurrent.* + +/** + * A blocking pool of a specific collection, where the entire pool is initially filled, and when the pool is empty, + * a [Pool.take] will wait for a corresponding [Pool.put]. + * + * @author dorkbox, llc + */ +internal class BlockingPoolCollection constructor( + private val queue: BlockingQueue, + collection: Collection) : Pool { + + private val dummyValue: T + + init { + dummyValue = collection.elementAt(0) + + for (it in collection) { + queue.offer(it) + } + } + + /** + * Takes an object from the pool, Blocks until an item is available in the pool. + * + * This method catches [InterruptedException] and discards it silently. + */ + override fun take(): T { + return try { + takeInterruptibly() + } catch (ignored: InterruptedException) { + dummyValue + } + } + + /** + * Takes an object from the pool, Blocks until an item is available in the pool. + * + * @throws InterruptedException + */ + override fun takeInterruptibly(): T { + return queue.take() + } + + /** + * Return object to the pool, waking the threads that have blocked during take() + */ + override fun put(`object`: T) { + queue.put(`object`) + } + + /** + * @return a new object instance created by the pool. + */ + override fun newInstance(): T { + return dummyValue + } +} diff --git a/src/dorkbox/objectPool/suspending/SuspendingPoolCollection.kt b/src/dorkbox/objectPool/suspending/SuspendingPoolCollection.kt new file mode 100644 index 0000000..1044a34 --- /dev/null +++ b/src/dorkbox/objectPool/suspending/SuspendingPoolCollection.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.objectPool.suspending + +import dorkbox.objectPool.Pool +import dorkbox.objectPool.SuspendingPool +import kotlinx.coroutines.runBlocking + +/** + * A suspending pool of a specific size, where the entire pool is initially filled, and when the pool is empty, + * a [Pool.take] will wait for a corresponding [Pool.put]. + * + * @author dorkbox, llc + */ +internal class SuspendingPoolCollection constructor( + private val queue: SuspendingQueue, + collection: Collection) : SuspendingPool { + + private val dummyValue: T = collection.elementAt(0) + + init { + runBlocking { + for (x in collection) { + queue.offer(x) + } + } + } + + /** + * Takes an object from the pool, Suspends until an item is available in the pool. + * + * This method catches [InterruptedException] and discards it silently. + */ + override suspend fun take(): T { + return try { + takeInterruptibly() + } catch (e: InterruptedException) { + dummyValue + } + } + + /** + * Takes an object from the pool, Suspends until an item is available in the pool. + * + * @throws InterruptedException + */ + override suspend fun takeInterruptibly(): T { + return queue.take() + } + + /** + * Return object to the pool, waking the threads that have suspended during take() + */ + override suspend fun put(`object`: T) { + queue.put(`object`) + } + + /** + * @return a new object instance created by the pool. + */ + override suspend fun newInstance(): T { + return dummyValue + } +} diff --git a/test/dorkbox/objectPool/BlockingTest.kt b/test/dorkbox/objectPool/PoolTest.kt similarity index 70% rename from test/dorkbox/objectPool/BlockingTest.kt rename to test/dorkbox/objectPool/PoolTest.kt index b92e257..827fcd4 100644 --- a/test/dorkbox/objectPool/BlockingTest.kt +++ b/test/dorkbox/objectPool/PoolTest.kt @@ -17,12 +17,11 @@ package dorkbox.objectPool import kotlinx.coroutines.runBlocking import org.junit.Assert -import org.junit.Ignore import org.junit.Test @Suppress("UNUSED_VARIABLE") -@Ignore -class BlockingTest { +class PoolTest { + @Test fun blockingTest() { val pobj = object : PoolObject() { @@ -37,8 +36,12 @@ class BlockingTest { val take1 = pool.take() val take2 = pool.take() val take3 = pool.take() - val take4 = pool.take() // this blocks - Assert.fail("shouldn't get here") +// val take4 = pool.take() // this blocks +// Assert.fail("shouldn't get here") + + pool.put(take2) + val take4 = pool.take() + Assert.assertEquals(take2, take4) } @Test @@ -112,4 +115,42 @@ class BlockingTest { Assert.assertTrue(pool.take() === take3) } } + + @Test + fun collectionBlockingTest() { + val collection = listOf(1, 2, 3, 4) + + val pool = ObjectPool.blocking(collection) + + val take = pool.take() + val take1 = pool.take() + val take2 = pool.take() + val take3 = pool.take() +// val take4 = pool.take() // this suspends +// Assert.fail("shouldn't get here") + + pool.put(take2) + val take4 = pool.take() + Assert.assertEquals(take2, take4) + } + + @Test + fun collectionSuspendTest() { + val collection = listOf(1, 2, 3, 4) + + val pool = ObjectPool.suspending(collection) + + runBlocking { + val take = pool.take() + val take1 = pool.take() + val take2 = pool.take() + val take3 = pool.take() +// val take4 = pool.take() // this suspends +// Assert.fail("shouldn't get here") + + pool.put(take2) + val take4 = pool.take() + Assert.assertEquals(take2, take4) + } + } }