Merge branch '1_BoundedNonBlockingPool' of https://github.com/net-ref/ObjectPool into net-ref-1_BoundedNonBlockingPool
This commit is contained in:
commit
0db4fcde44
|
@ -0,0 +1,8 @@
|
|||
package dorkbox.objectPool
|
||||
|
||||
abstract class BoundedPoolObject<T>: PoolObject<T>() {
|
||||
/**
|
||||
* Called when an object is removed from the pool. Useful for logging how many objects are being removed
|
||||
*/
|
||||
open fun onRemoval(`object`: T) {}
|
||||
}
|
|
@ -17,6 +17,7 @@ package dorkbox.objectPool
|
|||
|
||||
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
|
||||
import dorkbox.objectPool.blocking.BlockingPool
|
||||
import dorkbox.objectPool.nonBlocking.BoundedNonBlockingPool
|
||||
import dorkbox.objectPool.nonBlocking.NonBlockingPool
|
||||
import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool
|
||||
import dorkbox.objectPool.suspending.SuspendingPool
|
||||
|
@ -143,4 +144,37 @@ object ObjectPool {
|
|||
fun <T> nonBlockingSoftReference(poolObject: PoolObject<T>, queue: Queue<SoftReference<T>>): Pool<T> {
|
||||
return NonBlockingSoftPool(poolObject, queue)
|
||||
}
|
||||
|
||||
/**
|
||||
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
|
||||
* If the pool is empty, new objects will be created.
|
||||
* The items added to pool will never expire or be automatically garbage collected.
|
||||
* The items not added back to the pool will be garbage collected
|
||||
*
|
||||
* @param poolObject controls the lifecycle of the pooled objects.
|
||||
* @param maxSize controls the maxSize the pool can be
|
||||
* @param <T> the type of object used in the pool
|
||||
*
|
||||
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
|
||||
*/
|
||||
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long): Pool<T> {
|
||||
return BoundedNonBlockingPool(poolObject, maxSize)
|
||||
}
|
||||
|
||||
/**
|
||||
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
|
||||
* If the pool is empty, new objects will be created.
|
||||
* The items added to pool will never expire or be automatically garbage collected.
|
||||
* The items not added back to the pool will be garbage collected
|
||||
*
|
||||
* @param poolObject controls the lifecycle of the pooled objects.
|
||||
* @param maxSize controls the maxSize the pool can be
|
||||
* @param queue the queue implementation to use
|
||||
* @param <T> the type of object used in the pool
|
||||
*
|
||||
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
|
||||
*/
|
||||
fun <T> nonBlockingBounded(poolObject: BoundedPoolObject<T>, maxSize: Long, queue: Queue<T>): Pool<T> {
|
||||
return BoundedNonBlockingPool(poolObject, maxSize, queue)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package dorkbox.objectPool.nonBlocking
|
||||
|
||||
import dorkbox.objectPool.BoundedPoolObject
|
||||
import java.util.Queue
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
/**
|
||||
* A non-blocking pool which will create as many objects as much as needed but will only store maxSize in the pool.
|
||||
* If the pool is empty, new objects will be created.
|
||||
* The items added to pool will never expire or be automatically garbage collected.
|
||||
* The items not added back to the pool will be garbage collected
|
||||
* See [ObjectPool.NonBlockingSoftReference] for pooled objectsthat will expire/GC as needed
|
||||
*
|
||||
* @author dorkbox, llc
|
||||
*/
|
||||
|
||||
internal class BoundedNonBlockingPool<T>(
|
||||
private val poolObject: BoundedPoolObject<T>,
|
||||
private val maxSize: Long,
|
||||
private val queue: Queue<T> = ConcurrentLinkedQueue()) : NonBlockingPool<T>(poolObject, queue) {
|
||||
|
||||
private val currentSize: AtomicLong = AtomicLong(0)
|
||||
|
||||
/**
|
||||
* Return object to the pool, waking the threads that have blocked during take()
|
||||
* If current pool size is larger then max size, don't add object back into the pool
|
||||
*/
|
||||
override fun put(`object`: T) {
|
||||
if (currentSize.get() <= maxSize) {
|
||||
poolObject.onReturn(`object`)
|
||||
queue.offer(`object`)
|
||||
} else {
|
||||
currentSize.decrementAndGet()
|
||||
|
||||
poolObject.onRemoval(`object`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a new object instance created by the pool.
|
||||
*/
|
||||
override fun newInstance(): T {
|
||||
currentSize.incrementAndGet()
|
||||
|
||||
return poolObject.newInstance()
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
|
|||
*
|
||||
* @author dorkbox, llc
|
||||
*/
|
||||
internal class NonBlockingPool<T>(
|
||||
internal open class NonBlockingPool<T>(
|
||||
private val poolObject: PoolObject<T>,
|
||||
private val queue: Queue<T> = ConcurrentLinkedQueue()) : Pool<T> {
|
||||
|
||||
|
@ -47,7 +47,7 @@ internal class NonBlockingPool<T>(
|
|||
override fun takeInterruptibly(): T {
|
||||
var take = queue.poll()
|
||||
if (take == null) {
|
||||
take = poolObject.newInstance()
|
||||
take = newInstance()
|
||||
}
|
||||
poolObject.onTake(take)
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ class BlockingTest {
|
|||
fun blockingTest() {
|
||||
val pobj = object : PoolObject<String>() {
|
||||
override fun newInstance(): String {
|
||||
return ""
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ class BlockingTest {
|
|||
fun nonblockingTest() {
|
||||
val pobj = object : PoolObject<String>() {
|
||||
override fun newInstance(): String {
|
||||
return ""
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,37 @@ class BlockingTest {
|
|||
val take4 = pool.take() // this does not block
|
||||
}
|
||||
|
||||
@Test
|
||||
fun nonBlockingBoundedTest() {
|
||||
var removed = 0
|
||||
|
||||
val pobj = object : BoundedPoolObject<String>() {
|
||||
override fun onRemoval(`object`: String) {
|
||||
removed++
|
||||
}
|
||||
|
||||
override fun newInstance(): String {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
val pool = ObjectPool.nonBlockingBounded(pobj, 2)
|
||||
|
||||
val take = pool.take()
|
||||
val take1 = pool.take()
|
||||
val take2 = pool.take()
|
||||
val take3 = pool.take()
|
||||
val take4 = pool.take() // this does not block
|
||||
|
||||
pool.put(take)
|
||||
pool.put(take1)
|
||||
pool.put(take2)
|
||||
pool.put(take3)
|
||||
pool.put(take4)
|
||||
|
||||
Assert.assertEquals(3, removed)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun suspendTest() {
|
||||
val pobj = object : SuspendingPoolObject<String>() {
|
||||
|
|
Loading…
Reference in New Issue