Better interaction with suspending pools, to permit blocking put if (but only if needed)

master
Robinson 2023-09-06 21:43:31 +02:00
parent 228f9781ed
commit 8ecdfe2c2d
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 48 additions and 9 deletions

View File

@ -21,6 +21,11 @@ abstract class SuspendingPoolObject<T: Any> {
*/
open suspend fun onReturn(`object`: T) {}
/**
* Called when an object is returned to the pool, blocking if necessary. Useful for resetting an objects state, for example.
*/
open fun onReturnBlocking(`object`: T) {}
/**
* Called when an object is taken from the pool, useful for setting an objects state, for example.
*/

View File

@ -17,29 +17,30 @@
package dorkbox.objectPool.suspending
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
/**
* Wraps a Kotlin channel into a LIMITED queue implementations
*/
class ChannelQueue<E>(size: Int): SuspendingQueue<E> {
internal class ChannelQueue<E: Any>(size: Int): SuspendingQueue<E> {
private val channel = Channel<E>(size)
override fun offer(element: E): Boolean {
override inline fun offer(element: E): Boolean {
val result = channel.trySend(element)
return result.isSuccess
}
override fun remove(): E {
override inline fun remove(): E {
val tryReceive = channel.tryReceive()
return tryReceive.getOrNull() ?: throw NoSuchElementException("Channel is empty")
}
override fun poll(): E? {
override inline fun poll(): E? {
val tryReceive = channel.tryReceive()
return tryReceive.getOrNull()
}
override fun add(element: E): Boolean {
override inline fun add(element: E): Boolean {
val result = channel.trySend(element)
if (result.isSuccess) {
return true
@ -48,15 +49,19 @@ class ChannelQueue<E>(size: Int): SuspendingQueue<E> {
}
}
override suspend fun put(element: E) {
override suspend inline fun put(element: E) {
channel.send(element)
}
override suspend fun take(): E {
override inline fun putBlocking(element: E) {
channel.trySendBlocking(element)
}
override inline suspend fun take(): E {
return channel.receive()
}
override fun close() {
override inline fun close() {
channel.close()
}
}

View File

@ -48,7 +48,9 @@ internal class SuspendingPool<T: Any>(
*/
override suspend fun take(): T {
return try {
takeInterruptibly()
val take = queue.take()
poolObject.onTake(take)
take
} catch (e: InterruptedException) {
val newInstance = newInstance()
poolObject.onTake(newInstance)
@ -76,6 +78,14 @@ internal class SuspendingPool<T: Any>(
queue.put(`object`)
}
/**
* Return object to the pool, blocking if necessary, and waking the threads that have suspended during take()
*/
override fun putBlocking(`object`: T) {
poolObject.onReturnBlocking(`object`)
queue.putBlocking(`object`)
}
/**
* @return a new object instance created by the pool.
*/

View File

@ -68,6 +68,13 @@ internal class SuspendingPoolCollection<T: Any>(
queue.put(`object`)
}
/**
* Return object to the pool, blocking if necessary, and waking the threads that have suspended during take()
*/
override fun putBlocking(`object`: T) {
queue.putBlocking(`object`)
}
/**
* @return a new object instance created by the pool.
*/

View File

@ -64,6 +64,18 @@ interface SuspendingQueue<E: Any> {
@Throws(InterruptedException::class)
suspend fun put(element: E)
/**
* Inserts the specified element into this queue, blocking if necessary for space to become available.
*
* @param element the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified element prevents it from being added to this queue
*/
@Throws(InterruptedException::class)
fun putBlocking(element: E)
/**
* Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
*