Cleaned up ObjectPool constuctors/interfaces. Added SuspendingQueue as a type for the SuspendingPool (so we can now provide our own implementations)

master
Robinson 2022-04-03 15:02:17 +02:00
parent 0666fe6e8b
commit 1ce58f38af
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
7 changed files with 158 additions and 23 deletions

View File

@ -20,11 +20,12 @@ import dorkbox.objectPool.blocking.BlockingPool
import dorkbox.objectPool.nonBlocking.BoundedNonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool
import dorkbox.objectPool.suspending.ChannelQueue
import dorkbox.objectPool.suspending.SuspendingPool
import dorkbox.objectPool.suspending.SuspendingQueue
import java.lang.ref.SoftReference
import java.util.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.*
/**
* @author dorkbox, llc
@ -51,9 +52,25 @@ object ObjectPool {
* @return a suspending pool using the kotlin Channel implementation of a specific size
*/
fun <T> suspending(poolObject: SuspendingPoolObject<T>, size: Int): dorkbox.objectPool.SuspendingPool<T> {
return SuspendingPool(poolObject, size)
return suspending(poolObject, size, ChannelQueue(size))
}
/**
* Creates a suspending pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a suspending pool using the kotlin Channel implementation of a specific size
*/
fun <T> suspending(poolObject: SuspendingPoolObject<T>, size: Int, queue: SuspendingQueue<T>): dorkbox.objectPool.SuspendingPool<T> {
return SuspendingPool(poolObject, size, queue)
}
/**
* Creates a high-performance blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
@ -95,7 +112,7 @@ object ObjectPool {
* @return a blocking pool using the default [ConcurrentLinkedQueue] implementation
*/
fun <T> nonBlocking(poolObject: PoolObject<T>): Pool<T> {
return NonBlockingPool(poolObject)
return nonBlocking(poolObject, ConcurrentLinkedQueue())
}
/**
@ -129,7 +146,7 @@ object ObjectPool {
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingSoftReference(poolObject: PoolObject<T>): Pool<T> {
return NonBlockingSoftPool(poolObject)
return nonBlockingSoftReference(poolObject, ConcurrentLinkedQueue())
}
/**
@ -153,6 +170,7 @@ object ObjectPool {
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
*
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
*
@ -162,24 +180,25 @@ object ObjectPool {
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long): Pool<T> {
return BoundedNonBlockingPool(poolObject, maxSize)
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Int): Pool<T> {
return nonBlockingBounded(poolObject, maxSize, DisruptorBlockingQueue(maxSize))
}
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
*
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param maxSize controls the maxSize the pool can be
* @param queue the queue implementation to use
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long, queue: Queue<T>): Pool<T> {
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Int, queue: Queue<T>): Pool<T> {
return BoundedNonBlockingPool(poolObject, maxSize, queue)
}
}

View File

@ -16,13 +16,13 @@
package dorkbox.objectPool.nonBlocking
import dorkbox.objectPool.BoundedPoolObject
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.*
import java.util.concurrent.atomic.*
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
*
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
*
@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class BoundedNonBlockingPool<T>(
private val poolObject: BoundedPoolObject<T>,
private val maxSize: Long,
private val queue: Queue<T> = ConcurrentLinkedQueue()) : NonBlockingPool<T>(poolObject, queue) {
private val maxSize: Int,
private val queue: Queue<T>) : NonBlockingPool<T>(poolObject, queue) {
private val currentSize: AtomicLong = AtomicLong(0)

View File

@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
*/
internal open class NonBlockingPool<T>(
private val poolObject: PoolObject<T>,
private val queue: Queue<T> = ConcurrentLinkedQueue()) : Pool<T> {
private val queue: Queue<T>) : Pool<T> {
/**
* Takes an object from the pool, if there is no object available, will create a new object.

View File

@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
*/
internal class NonBlockingSoftPool<T>(
private val poolObject: PoolObject<T>,
private val queue: Queue<SoftReference<T>> = ConcurrentLinkedQueue()) : Pool<T> {
private val queue: Queue<SoftReference<T>>) : Pool<T> {
/**
* Takes an object from the pool, if there is no object available, will create a new object.

View File

@ -0,0 +1,42 @@
package dorkbox.objectPool.suspending
import kotlinx.coroutines.channels.Channel
/**
* Wraps a Kotlin channel into a LIMITED queue implementations
*/
class ChannelQueue<E>(size: Int): SuspendingQueue<E> {
private val channel = Channel<E>(size)
override fun offer(element: E): Boolean {
val result = channel.trySend(element)
return result.isSuccess
}
override fun remove(): E {
val tryReceive = channel.tryReceive()
return tryReceive.getOrNull() ?: throw NoSuchElementException("Channel is empty")
}
override fun poll(): E? {
val tryReceive = channel.tryReceive()
return tryReceive.getOrNull()
}
override fun add(element: E): Boolean {
val result = channel.trySend(element)
if (result.isSuccess) {
return true
} else {
throw IllegalStateException("Channel is full.")
}
}
override suspend fun put(element: E) {
channel.send(element)
}
override suspend fun take(): E {
return channel.receive()
}
}

View File

@ -18,7 +18,6 @@ package dorkbox.objectPool.suspending
import dorkbox.objectPool.Pool
import dorkbox.objectPool.SuspendingPool
import dorkbox.objectPool.SuspendingPoolObject
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
/**
@ -29,16 +28,15 @@ import kotlinx.coroutines.runBlocking
*/
internal class SuspendingPool<T> constructor(
private val poolObject: SuspendingPoolObject<T>,
size: Int) : SuspendingPool<T> {
private val channel = Channel<T>(size)
size: Int,
private val queue: SuspendingQueue<T>) : SuspendingPool<T> {
init {
runBlocking {
for (x in 0 until size) {
val e = newInstance()
poolObject.onReturn(e)
channel.trySend(e)
queue.offer(e)
}
}
}
@ -64,7 +62,7 @@ internal class SuspendingPool<T> constructor(
* @throws InterruptedException
*/
override suspend fun takeInterruptibly(): T {
val take = channel.receive()
val take = queue.take()
poolObject.onTake(take)
return take
@ -75,7 +73,7 @@ internal class SuspendingPool<T> constructor(
*/
override suspend fun put(`object`: T) {
poolObject.onReturn(`object`)
channel.send(`object`)
queue.put(`object`)
}
/**

View File

@ -0,0 +1,76 @@
package dorkbox.objectPool.suspending
import java.util.Collection
import java.util.concurrent.*
/**
* A limited version of a [BlockingQueue] that suspends instead of blocking
*/
interface SuspendingQueue<E> {
/**
* Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning
* `true` upon success and throwing an `IllegalStateException` if no space is currently available.
*
* When using a capacity-restricted queue, it is generally preferable to use [offer][.offer].
*
* @param element the element to add
* @return `true` (as specified by [Collection.add])
* @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
* @throws ClassCastException if the class of the specified element prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified element prevents it from being added to this queue
*/
fun add(element: E): Boolean
/**
* Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning
* `true` upon success and `false` if no space is currently available.
*
* When using a capacity-restricted queue, this method is generally preferable to [.add], which can fail to insert an
* element only by throwing an exception.
*
* @param element the element to add
* @return `true` if the element was added to this queue, else `false`
* @throws ClassCastException if the class of the specified element prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified element prevents it from being added to this queue
*/
fun offer(element: E): Boolean
/**
* Inserts the specified element into this queue, waiting if necessary for space to become available.
*
* @param element the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified element prevents it from being added to this queue
*/
@Throws(InterruptedException::class)
suspend fun put(element: E)
/**
* Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
@Throws(InterruptedException::class)
suspend fun take(): E
/**
* Retrieves and removes the head of this queue. This method differs from {@link #poll() poll()} only in that it throws an exception if
* this queue is empty.
*
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
fun remove(): E
/**
* Retrieves and removes the head of this queue, or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
fun poll(): E?
}