Added support for converting collections to a blocking/suspending pool

master
Robinson 2022-06-29 16:07:54 +02:00
parent c8a4639ad9
commit 05dd096b31
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 263 additions and 6 deletions

View File

@ -17,11 +17,13 @@ package dorkbox.objectPool
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import dorkbox.objectPool.blocking.BlockingPool
import dorkbox.objectPool.blocking.BlockingPoolCollection
import dorkbox.objectPool.nonBlocking.BoundedNonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool
import dorkbox.objectPool.suspending.ChannelQueue
import dorkbox.objectPool.suspending.SuspendingPool
import dorkbox.objectPool.suspending.SuspendingPoolCollection
import dorkbox.objectPool.suspending.SuspendingQueue
import java.lang.ref.SoftReference
import java.util.*
@ -70,7 +72,6 @@ object ObjectPool {
return SuspendingPool(poolObject, size, queue)
}
/**
* Creates a high-performance blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
@ -201,4 +202,66 @@ object ObjectPool {
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Int, queue: Queue<T>): Pool<T> {
return BoundedNonBlockingPool(poolObject, maxSize, queue)
}
/**
* Creates a suspending pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a suspending pool using the kotlin Channel implementation of a specific size
*/
fun <T> suspending(collection: Collection<T>): dorkbox.objectPool.SuspendingPool<T> {
return suspending(ChannelQueue(collection.size), collection)
}
/**
* Creates a suspending pool of an existing collection, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param collection the existing collection to convert to a pool
* @param <T> the type of object used in the pool
*
* @return a suspending pool using the kotlin Channel implementation of a specific size
*/
fun <T> suspending(queue: SuspendingQueue<T>, collection: Collection<T>): dorkbox.objectPool.SuspendingPool<T> {
return SuspendingPoolCollection(queue, collection)
}
/**
* Creates a high-performance blocking pool of an existing collection, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param collection the existing collection to convert to a pool
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the DisruptorBlockingQueue implementation of a specific size
*/
fun <T> blocking(collection: Collection<T>): Pool<T> {
return blocking(DisruptorBlockingQueue(collection.size), collection)
}
/**
* Creates a blocking pool from an existing collection, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param queue the blocking queue implementation to use
* @param collection the existing collection to convert to a pool
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the specified [BlockingQueue] implementation of a specific size
*/
fun <T> blocking(queue: BlockingQueue<T>, collection: Collection<T>): Pool<T> {
return BlockingPoolCollection(queue, collection)
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool.blocking
import dorkbox.objectPool.Pool
import java.util.concurrent.*
/**
* A blocking pool of a specific collection, where the entire pool is initially filled, and when the pool is empty,
* a [Pool.take] will wait for a corresponding [Pool.put].
*
* @author dorkbox, llc
*/
internal class BlockingPoolCollection<T> constructor(
private val queue: BlockingQueue<T>,
collection: Collection<T>) : Pool<T> {
private val dummyValue: T
init {
dummyValue = collection.elementAt(0)
for (it in collection) {
queue.offer(it)
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
override fun take(): T {
return try {
takeInterruptibly()
} catch (ignored: InterruptedException) {
dummyValue
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* @throws InterruptedException
*/
override fun takeInterruptibly(): T {
return queue.take()
}
/**
* Return object to the pool, waking the threads that have blocked during take()
*/
override fun put(`object`: T) {
queue.put(`object`)
}
/**
* @return a new object instance created by the pool.
*/
override fun newInstance(): T {
return dummyValue
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool.suspending
import dorkbox.objectPool.Pool
import dorkbox.objectPool.SuspendingPool
import kotlinx.coroutines.runBlocking
/**
* A suspending pool of a specific size, where the entire pool is initially filled, and when the pool is empty,
* a [Pool.take] will wait for a corresponding [Pool.put].
*
* @author dorkbox, llc
*/
internal class SuspendingPoolCollection<T> constructor(
private val queue: SuspendingQueue<T>,
collection: Collection<T>) : SuspendingPool<T> {
private val dummyValue: T = collection.elementAt(0)
init {
runBlocking {
for (x in collection) {
queue.offer(x)
}
}
}
/**
* Takes an object from the pool, Suspends until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
override suspend fun take(): T {
return try {
takeInterruptibly()
} catch (e: InterruptedException) {
dummyValue
}
}
/**
* Takes an object from the pool, Suspends until an item is available in the pool.
*
* @throws InterruptedException
*/
override suspend fun takeInterruptibly(): T {
return queue.take()
}
/**
* Return object to the pool, waking the threads that have suspended during take()
*/
override suspend fun put(`object`: T) {
queue.put(`object`)
}
/**
* @return a new object instance created by the pool.
*/
override suspend fun newInstance(): T {
return dummyValue
}
}

View File

@ -17,12 +17,11 @@ package dorkbox.objectPool
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Ignore
import org.junit.Test
@Suppress("UNUSED_VARIABLE")
@Ignore
class BlockingTest {
class PoolTest {
@Test
fun blockingTest() {
val pobj = object : PoolObject<String>() {
@ -37,8 +36,12 @@ class BlockingTest {
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
val take4 = pool.take() // this blocks
Assert.fail("shouldn't get here")
// val take4 = pool.take() // this blocks
// Assert.fail("shouldn't get here")
pool.put(take2)
val take4 = pool.take()
Assert.assertEquals(take2, take4)
}
@Test
@ -112,4 +115,42 @@ class BlockingTest {
Assert.assertTrue(pool.take() === take3)
}
}
@Test
fun collectionBlockingTest() {
val collection = listOf(1, 2, 3, 4)
val pool = ObjectPool.blocking(collection)
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
// val take4 = pool.take() // this suspends
// Assert.fail("shouldn't get here")
pool.put(take2)
val take4 = pool.take()
Assert.assertEquals(take2, take4)
}
@Test
fun collectionSuspendTest() {
val collection = listOf(1, 2, 3, 4)
val pool = ObjectPool.suspending(collection)
runBlocking {
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
// val take4 = pool.take() // this suspends
// Assert.fail("shouldn't get here")
pool.put(take2)
val take4 = pool.take()
Assert.assertEquals(take2, take4)
}
}
}