Merge branch 'net-ref-1_BoundedNonBlockingPool'

This commit is contained in:
nathan 2020-10-20 00:53:49 +02:00
commit 3e12e703ca
5 changed files with 125 additions and 4 deletions

View File

@ -0,0 +1,8 @@
package dorkbox.objectPool
abstract class BoundedPoolObject<T>: PoolObject<T>() {
/**
* Called when an object is removed from the pool. Useful for logging how many objects are being removed
*/
open fun onRemoval(`object`: T) {}
}

View File

@ -17,6 +17,7 @@ package dorkbox.objectPool
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import dorkbox.objectPool.blocking.BlockingPool
import dorkbox.objectPool.nonBlocking.BoundedNonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool
import dorkbox.objectPool.suspending.SuspendingPool
@ -143,4 +144,37 @@ object ObjectPool {
fun <T> nonBlockingSoftReference(poolObject: PoolObject<T>, queue: Queue<SoftReference<T>>): Pool<T> {
return NonBlockingSoftPool(poolObject, queue)
}
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param maxSize controls the maxSize the pool can be
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long): Pool<T> {
return BoundedNonBlockingPool(poolObject, maxSize)
}
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param maxSize controls the maxSize the pool can be
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long, queue: Queue<T>): Pool<T> {
return BoundedNonBlockingPool(poolObject, maxSize, queue)
}
}

View File

@ -0,0 +1,48 @@
package dorkbox.objectPool.nonBlocking
import dorkbox.objectPool.BoundedPoolObject
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
/**
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
* If the pool is empty, new objects will be created.
* The items added to pool will never expire or be automatically garbage collected.
* The items not added back to the pool will be garbage collected
* See [ObjectPool.NonBlockingSoftReference] for pooled objectsthat will expire/GC as needed
*
* @author dorkbox, llc
*/
internal class BoundedNonBlockingPool<T>(
private val poolObject: BoundedPoolObject<T>,
private val maxSize: Long,
private val queue: Queue<T> = ConcurrentLinkedQueue()) : NonBlockingPool<T>(poolObject, queue) {
private val currentSize: AtomicLong = AtomicLong(0)
/**
* Return object to the pool, waking the threads that have blocked during take()
* If current pool size is larger then max size, don't add object back into the pool
*/
override fun put(`object`: T) {
if (currentSize.get() <= maxSize) {
poolObject.onReturn(`object`)
queue.offer(`object`)
} else {
currentSize.decrementAndGet()
poolObject.onRemoval(`object`)
}
}
/**
* @return a new object instance created by the pool.
*/
override fun newInstance(): T {
currentSize.incrementAndGet()
return poolObject.newInstance()
}
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
*
* @author dorkbox, llc
*/
internal class NonBlockingPool<T>(
internal open class NonBlockingPool<T>(
private val poolObject: PoolObject<T>,
private val queue: Queue<T> = ConcurrentLinkedQueue()) : Pool<T> {
@ -47,7 +47,7 @@ internal class NonBlockingPool<T>(
override fun takeInterruptibly(): T {
var take = queue.poll()
if (take == null) {
take = poolObject.newInstance()
take = newInstance()
}
poolObject.onTake(take)

View File

@ -26,7 +26,7 @@ class BlockingTest {
fun blockingTest() {
val pobj = object : PoolObject<String>() {
override fun newInstance(): String {
return ""
return ""
}
}
@ -44,7 +44,7 @@ class BlockingTest {
fun nonblockingTest() {
val pobj = object : PoolObject<String>() {
override fun newInstance(): String {
return ""
return ""
}
}
@ -57,6 +57,37 @@ class BlockingTest {
val take4 = pool.take() // this does not block
}
@Test
fun nonBlockingBoundedTest() {
var removed = 0
val pobj = object : BoundedPoolObject<String>() {
override fun onRemoval(`object`: String) {
removed++
}
override fun newInstance(): String {
return ""
}
}
val pool = ObjectPool.nonBlockingBounded(pobj, 2)
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
val take4 = pool.take() // this does not block
pool.put(take)
pool.put(take1)
pool.put(take2)
pool.put(take3)
pool.put(take4)
Assert.assertEquals(3, removed)
}
@Test
fun suspendTest() {
val pobj = object : SuspendingPoolObject<String>() {