Changed library to kotlin. Added coroutine suspend support. Changed API slightly

This commit is contained in:
nathan 2020-10-05 19:25:21 +02:00
parent 4ffe9a77f3
commit 2aa9f12b39
16 changed files with 681 additions and 382 deletions

35
LICENSE
View File

@ -1,5 +1,32 @@
- ObjectPool -
- ObjectPool - Fast, lightweight, and compatible blocking/non-blocking/soft-reference object pool for Java 6+
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/ObjectPool
Copyright 2019 - The Apache Software License, Version 2.0
dorkbox, llc
Fast, lightweight, and compatible blocking/non-blocking/soft-reference object pool for Java 6+
Copyright 2020
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support
[The Apache Software License, Version 2.0]
https://github.com/Kotlin/kotlinx.coroutines
Copyright 2020
JetBrains s.r.o.
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2020
QOS.ch
- Conversant Disruptor - Disruptor is the highest performing intra-thread transfer mechanism available in Java.
[The Apache Software License, Version 2.0]
https://github.com/conversant/disruptor
Copyright 2020
Conversant, Inc

21
LICENSE.MIT Normal file
View File

@ -0,0 +1,21 @@
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -30,13 +30,13 @@ gradle.startParameter.warningMode = WarningMode.All
plugins {
java
id("com.dorkbox.GradleUtils") version "1.9"
id("com.dorkbox.Licensing") version "2.2"
id("com.dorkbox.GradleUtils") version "1.12"
id("com.dorkbox.Licensing") version "2.5.2"
id("com.dorkbox.VersionUpdate") version "2.0"
id("com.dorkbox.GradlePublish") version "1.4"
id("com.dorkbox.GradleModuleInfo") version "1.0"
id("com.dorkbox.GradlePublish") version "1.8"
id("com.dorkbox.GradleModuleInfo") version "1.1"
kotlin("jvm") version "1.3.72"
kotlin("jvm") version "1.4.10"
}
object Extras {
@ -81,6 +81,22 @@ sourceSets {
include("**/*.kt")
}
}
test {
java {
setSrcDirs(listOf("test"))
// want to include java files for the source. 'setSrcDirs' resets includes...
include("**/*.java")
}
kotlin {
setSrcDirs(listOf("test"))
// want to include java files for the source. 'setSrcDirs' resets includes...
include("**/*.java", "**/*.kt")
}
}
}
repositories {
@ -113,5 +129,7 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:${Extras.coroutineVer}")
implementation("org.slf4j:slf4j-api:1.7.30")
implementation("com.conversantmedia:disruptor:1.2.17")
implementation("com.conversantmedia:disruptor:1.2.19")
testImplementation("junit:junit:4.13")
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
import java.util.concurrent.BlockingQueue;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
/**
* A blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* {@link ObjectPool#take()} will wait for a corresponding {@link ObjectPool#put(Object)}.
*
* @author dorkbox, llc
*/
class BlockingPool<T> extends ObjectPool<T> {
private final BlockingQueue<T> queue;
private final PoolableObject<T> poolableObject;
BlockingPool(PoolableObject<T> poolableObject, int size) {
this(poolableObject, new DisruptorBlockingQueue<T>(size), size);
}
BlockingPool(final PoolableObject<T> poolableObject, final BlockingQueue<T> queue, final int size) {
this.poolableObject = poolableObject;
this.queue = queue;
for (int x = 0; x < size; x++) {
T e = poolableObject.create();
poolableObject.onReturn(e);
this.queue.add(e);
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
* <p/>
* This method catches {@link InterruptedException} and discards it silently.
*/
@Override
public
T take() {
try {
return takeInterruptibly();
} catch (InterruptedException e) {
return null;
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*/
@Override
public
T takeInterruptibly() throws InterruptedException {
final T take = this.queue.take();
poolableObject.onTake(take);
return take;
}
/**
* Return object to the pool, waking the threads that have blocked during take()
*/
@Override
public
void put(T object) {
poolableObject.onReturn(object);
this.queue.offer(object);
}
/**
* @return a new object instance created by the pool.
*/
@Override
public
T newInstance() {
return poolableObject.create();
}
}

View File

@ -1,89 +0,0 @@
/*
* Copyright 2016 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
import java.lang.ref.SoftReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See {@link #NonBlocking(PoolableObject)}
* for pooled objects that will never expire).
* @author dorkbox, llc
*/
class NonBlockingSoftPool<T> extends ObjectPool<T> {
private final Queue<SoftReference<T>> queue;
private final PoolableObject<T> poolableObject;
NonBlockingSoftPool(final PoolableObject<T> poolableObject) {
this(poolableObject, new ConcurrentLinkedQueue<SoftReference<T>>());
}
NonBlockingSoftPool(final PoolableObject<T> poolableObject, final Queue<SoftReference<T>> queue) {
this.poolableObject = poolableObject;
this.queue = queue;
}
/**
* Takes an object from the pool.
*/
@Override
public
T take() {
T obj;
SoftReference<T> ref;
while((ref = queue.poll()) != null) {
if((obj = ref.get()) != null) {
poolableObject.onTake(obj);
return obj;
}
}
final T take = poolableObject.create();
poolableObject.onTake(take);
return take;
}
/**
* Takes an object from the pool.
*/
@Override
public
T takeInterruptibly() throws InterruptedException {
return take();
}
/**
* Return object to the pool.
*/
@Override
public
void put(T object) {
poolableObject.onReturn(object);
this.queue.offer(new SoftReference<T>(object));
}
/**
* @return a new object instance created by the pool.
*/
@Override
public
T newInstance() {
return poolableObject.create();
}
}

View File

@ -1,125 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
import java.lang.ref.SoftReference;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
/**
* @author dorkbox, llc
*/
@SuppressWarnings("unused")
public abstract
class ObjectPool<T> implements Pool<T> {
/**
* Gets the version number.
*/
public static
String getVersion() {
return "2.12";
}
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* {@link ObjectPool#take()} will wait for a corresponding {@link ObjectPool#put(Object)}.
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
public static <T> ObjectPool<T> Blocking(final PoolableObject<T> poolableObject, final int size) {
return new BlockingPool<T>(poolableObject, size);
}
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* {@link ObjectPool#take()} will wait for a corresponding {@link ObjectPool#put(Object)}.
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param queue the blocking queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
public static <T> ObjectPool<T> Blocking(final PoolableObject<T> poolableObject, final BlockingQueue<T> queue, final int size) {
return new BlockingPool<T>(poolableObject, queue, size);
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will never expire or be automatically garbage collected. (see {@link #NonBlockingSoftReference(PoolableObject)} for pooled objects
* that will expire/GC as needed).
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
public static <T> ObjectPool<T> NonBlocking(final PoolableObject<T> poolableObject) {
return new NonBlockingPool<T>(poolableObject);
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will never expire or be automatically garbage collected. (see {@link #NonBlockingSoftReference(PoolableObject)} for pooled objects
* that will expire/GC as needed).
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
public static <T> ObjectPool<T> NonBlocking(final PoolableObject<T> poolableObject, final Queue<T> queue) {
return new NonBlockingPool<T>(poolableObject, queue);
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See {@link #NonBlocking(PoolableObject)}
* for pooled objects that will never expire).
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
public static <T> ObjectPool<T> NonBlockingSoftReference(final PoolableObject<T> poolableObject) {
return new NonBlockingSoftPool<T>(poolableObject);
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See {@link #NonBlocking(PoolableObject)}
* for pooled objects that will never expire).
*
* @param poolableObject controls the lifecycle of the pooled objects.
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
public static <T> ObjectPool<T> NonBlockingSoftReference(final PoolableObject<T> poolableObject, final Queue<SoftReference<T>> queue) {
return new NonBlockingSoftPool<T>(poolableObject, queue);
}
}

View File

@ -0,0 +1,151 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import dorkbox.objectPool.blocking.BlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingPool
import dorkbox.objectPool.nonBlocking.NonBlockingSoftPool
import dorkbox.objectPool.suspending.SuspendingPool
import kotlinx.coroutines.channels.Channel
import java.lang.ref.SoftReference
import java.util.*
import java.util.concurrent.BlockingQueue
/**
* @author dorkbox, llc
*/
object ObjectPool {
/**
* Gets the version number.
*/
val version: String
get() = "3.0"
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
fun <T> suspending(poolObject: SuspendingPoolObject<T>, size: Int): dorkbox.objectPool.SuspendingPool<T> {
return suspending(poolObject, Channel(size), size)
}
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
fun <T> suspending(poolObject: SuspendingPoolObject<T>, channel: Channel<T>, size: Int): dorkbox.objectPool.SuspendingPool<T> {
return SuspendingPool(poolObject, channel, size)
}
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param size the size of the pool to create
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
fun <T> blocking(poolObject: PoolObject<T>, size: Int): Pool<T> {
return BlockingPool(poolObject, DisruptorBlockingQueue<T>(size), size)
}
/**
* Creates a blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty, a
* [Pool.take] will wait for a corresponding [Pool.put].
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param queue the blocking queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ArrayBlockingQueue implementation of a specific size
*/
fun <T> blocking(poolObject: PoolObject<T>, queue: BlockingQueue<T>, size: Int): Pool<T> {
return BlockingPool(poolObject, queue, size)
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will never expire or be automatically garbage collected. (see [.NonBlockingSoftReference] for pooled objects
* that will expire/GC as needed).
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlocking(poolObject: PoolObject<T>): Pool<T> {
return NonBlockingPool(poolObject)
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will never expire or be automatically garbage collected. (see [.NonBlockingSoftReference] for pooled objects
* that will expire/GC as needed).
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlocking(poolObject: PoolObject<T>, queue: Queue<T>): Pool<T> {
return NonBlockingPool(poolObject, queue)
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See [.NonBlocking]
* for pooled objects that will never expire).
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingSoftReference(poolObject: PoolObject<T>): Pool<T> {
return NonBlockingSoftPool(poolObject)
}
/**
* Creates a non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See [.NonBlocking]
* for pooled objects that will never expire).
*
* @param poolObject controls the lifecycle of the pooled objects.
* @param queue the queue implementation to use
* @param <T> the type of object used in the pool
*
* @return a blocking pool using the default ConcurrentLinkedQueue implementation
*/
fun <T> nonBlockingSoftReference(poolObject: PoolObject<T>, queue: Queue<SoftReference<T>>): Pool<T> {
return NonBlockingSoftPool(poolObject, queue)
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2014 dorkbox, llc
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,33 +13,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
package dorkbox.objectPool
/**
* @author dorkbox, llc
*/
interface Pool<T> {
/**
* Takes an object from the pool. If the pool is a {@link BlockingPool}, this will wait until an item is available in
* Takes an object from the pool. If the pool is a [BlockingPool], this will wait until an item is available in
* the pool.
* <p/>
* This method catches {@link InterruptedException} and discards it silently.
*
*
* This method catches [InterruptedException] and discards it silently.
*/
T take();
fun take(): T
/**
* Takes an object from the pool. If the pool is a {@link BlockingPool}, this will wait until an item is available in the pool.
* Takes an object from the pool. If the pool is a [BlockingPool], this will wait until an item is available in the pool.
*
* @throws InterruptedException
*/
T takeInterruptibly() throws InterruptedException;
fun takeInterruptibly(): T
/**
* Return object to the pool. If the pool is a {@link BlockingPool}, this will wake the threads that have blocked during take/takeInterruptibly()
* Return object to the pool. If the pool is a [BlockingPool], this will wake the threads that have blocked during take/takeInterruptibly()
*/
void put(T object);
fun put(`object`: T)
/**
* @return a new object instance created by the pool.
*/
T newInstance();
fun newInstance(): T
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2014 dorkbox, llc
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,28 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
public
abstract class PoolableObject<T> {
package dorkbox.objectPool
abstract class PoolObject<T> {
/**
* Called when an object is returned to the pool, useful for resetting an objects state, for example.
*/
public
void onReturn(T object) {
}
open fun onReturn(`object`: T) {}
/**
* Called when an object is taken from the pool, useful for setting an objects state, for example.
*/
public
void onTake(T object) {
}
open fun onTake(`object`: T) {}
/**
* Called when a new instance is created
*/
public abstract T create();
abstract fun newInstance(): T
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool
/**
* @author dorkbox, llc
*/
interface SuspendingPool<T> {
/**
* Takes an object from the pool. If the pool is a [SuspendingPool], this will wait until an item is available in
* the pool.
*
*
* This method catches [InterruptedException] and discards it silently.
*/
suspend fun take(): T
/**
* Takes an object from the pool. If the pool is a [SuspendingPool], this will wait until an item is available in the pool.
*
* @throws InterruptedException
*/
suspend fun takeInterruptibly(): T
/**
* Return object to the pool. If the pool is a [SuspendingPool], this will wake the threads that have blocked during take/takeInterruptibly()
*/
suspend fun put(`object`: T)
/**
* @return a new object instance created by the pool.
*/
suspend fun newInstance(): T
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool
abstract class SuspendingPoolObject<T> {
/**
* Called when an object is returned to the pool, useful for resetting an objects state, for example.
*/
open suspend fun onReturn(`object`: T) {}
/**
* Called when an object is taken from the pool, useful for setting an objects state, for example.
*/
open suspend fun onTake(`object`: T) {}
/**
* Called when a new instance is created
*/
abstract suspend fun newInstance(): T
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool.blocking
import dorkbox.objectPool.Pool
import dorkbox.objectPool.PoolObject
import java.util.concurrent.BlockingQueue
/**
* A blocking pool of a specific size, where the entire pool is initially filled, and when the pool is empty,
* a [Pool.take] will wait for a corresponding [Pool.put].
*
* @author dorkbox, llc
*/
class BlockingPool<T> internal constructor(
private val poolObject: PoolObject<T>,
private val queue: BlockingQueue<T>,
size: Int) : Pool<T> {
init {
for (x in 0 until size) {
val e = newInstance()
poolObject.onReturn(e)
queue.offer(e)
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
override fun take(): T {
return try {
takeInterruptibly()
} catch (e: InterruptedException) {
val newInstance = newInstance()
poolObject.onTake(newInstance)
newInstance
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* @throws InterruptedException
*/
override fun takeInterruptibly(): T {
val take = queue.take()
poolObject.onTake(take)
return take
}
/**
* Return object to the pool, waking the threads that have blocked during take()
*/
override fun put(`object`: T) {
poolObject.onReturn(`object`)
queue.put(`object`)
}
/**
* @return a new object instance created by the pool.
*/
override fun newInstance(): T {
return poolObject.newInstance()
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016 dorkbox, llc
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,74 +13,67 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool;
package dorkbox.objectPool.nonBlocking
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import dorkbox.objectPool.Pool
import dorkbox.objectPool.PoolObject
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
/**
* A non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will never expire or be automatically garbage collected. (see {@link #NonBlockingSoftReference(PoolableObject)} for pooled objects
* pool will never expire or be automatically garbage collected. (see [ObjectPool.NonBlockingSoftReference] for pooled objects
* that will expire/GC as needed).
*
* @author dorkbox, llc
*/
class NonBlockingPool<T> extends ObjectPool<T> {
private final Queue<T> queue;
private final PoolableObject<T> poolableObject;
internal class NonBlockingPool<T>(
private val poolObject: PoolObject<T>,
private val queue: Queue<T> = ConcurrentLinkedQueue()) : Pool<T> {
NonBlockingPool(final PoolableObject<T> poolableObject) {
this(poolableObject, new ConcurrentLinkedQueue<T>());
}
NonBlockingPool(final PoolableObject<T> poolableObject, final Queue<T> queue) {
this.poolableObject = poolableObject;
this.queue = queue;
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
@Override
public
T take() {
T take = this.queue.poll();
if (take == null) {
take = poolableObject.create();
override fun take(): T {
return try {
takeInterruptibly()
} catch (e: InterruptedException) {
val newInstance = newInstance()
poolObject.onTake(newInstance)
newInstance
}
poolableObject.onTake(take);
return take;
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
* <p/>
* This method catches {@link InterruptedException} and discards it silently.
*
* @throws InterruptedException
*/
@Override
public
T takeInterruptibly() throws InterruptedException {
return take();
override fun takeInterruptibly(): T {
var take = queue.poll()
if (take == null) {
take = poolObject.newInstance()
}
poolObject.onTake(take)
return take
}
/**
* Return object to the pool, waking the threads that have blocked during take()
*/
@Override
public
void put(T object) {
poolableObject.onReturn(object);
this.queue.offer(object);
override fun put(`object`: T) {
poolObject.onReturn(`object`)
queue.offer(`object`)
}
/**
* @return a new object instance created by the pool.
*/
@Override
public
T newInstance() {
return poolableObject.create();
override fun newInstance(): T {
return poolObject.newInstance()
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool.nonBlocking
import dorkbox.objectPool.Pool
import dorkbox.objectPool.PoolObject
import java.lang.ref.SoftReference
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
/**
* A non-blocking pool which will grow as much as needed. If the pool is empty, new objects will be created. The items in the
* pool will expire and be automatically Garbage Collected in response to memory demand. (See [ObjectPool.NonBlocking]
* for pooled objects that will never expire).
*
* @author dorkbox, llc
*/
internal class NonBlockingSoftPool<T>(
private val poolObject: PoolObject<T>,
private val queue: Queue<SoftReference<T>> = ConcurrentLinkedQueue()) : Pool<T> {
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
override fun take(): T {
return try {
takeInterruptibly()
} catch (e: InterruptedException) {
val newInstance = newInstance()
poolObject.onTake(newInstance)
newInstance
}
}
/**
* Takes an object from the pool, Blocks until an item is available in the pool.
*
* @throws InterruptedException
*/
override fun takeInterruptibly(): T {
val obj: T?
val ref: SoftReference<T>? = queue.poll()
if (ref != null) {
obj = ref.get()
if (obj != null) {
poolObject.onTake(obj)
return obj
}
}
// not in queue, so make one
val take = poolObject.newInstance()
poolObject.onTake(take)
return take
}
/**
* Return object to the pool.
*/
override fun put(`object`: T) {
poolObject.onReturn(`object`)
queue.offer(SoftReference(`object`))
}
/**
* @return a new object instance created by the pool.
*/
override fun newInstance(): T {
return poolObject.newInstance()
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.objectPool.suspending
import dorkbox.objectPool.SuspendingPool
import dorkbox.objectPool.SuspendingPoolObject
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
/**
* A blocking, suspending pool of a specific size, where the entire pool is initially filled, and when the pool
* is empty, a [Pool.take] will wait for a corresponding [Pool.put].
*
* @author dorkbox, llc
*/
class SuspendingPool<T> internal constructor(
private val poolObject: SuspendingPoolObject<T>,
private val channel: Channel<T>,
size: Int) : SuspendingPool<T> {
init {
runBlocking {
for (x in 0 until size) {
val e = newInstance()
poolObject.onReturn(e)
channel.offer(e)
}
}
}
/**
* Takes an object from the pool, Suspends until an item is available in the pool.
*
* This method catches [InterruptedException] and discards it silently.
*/
override suspend fun take(): T {
return try {
takeInterruptibly()
} catch (e: InterruptedException) {
val newInstance = newInstance()
poolObject.onTake(newInstance)
newInstance
}
}
/**
* Takes an object from the pool, Suspends until an item is available in the pool.
*
* @throws InterruptedException
*/
override suspend fun takeInterruptibly(): T {
val take = channel.receive()
poolObject.onTake(take)
return take
}
/**
* Return object to the pool, waking the threads that have suspended during take()
*/
override suspend fun put(`object`: T) {
poolObject.onReturn(`object`)
channel.send(`object`)
}
/**
* @return a new object instance created by the pool.
*/
override suspend fun newInstance(): T {
return poolObject.newInstance()
}
}

View File

@ -0,0 +1,64 @@
package dorkbox.objectPool
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Ignore
import org.junit.Test
@Ignore
class BlockingTest {
@Test
fun blockingTest() {
val pobj = object : PoolObject<String>() {
override fun newInstance(): String {
return ""
}
}
val pool = ObjectPool.blocking(pobj, 4)
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
val take4 = pool.take() // this blocks
Assert.fail("shouldn't get here")
}
@Test
fun nonblockingTest() {
val pobj = object : PoolObject<String>() {
override fun newInstance(): String {
return ""
}
}
val pool = ObjectPool.nonBlocking(pobj)
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
val take4 = pool.take() // this does not block
}
@Test
fun suspendTest() {
val pobj = object : SuspendingPoolObject<String>() {
override suspend fun newInstance(): String {
return ""
}
}
val pool = ObjectPool.suspending(pobj, 4)
runBlocking {
val take = pool.take()
val take1 = pool.take()
val take2 = pool.take()
val take3 = pool.take()
val take4 = pool.take() // this suspends
Assert.fail("shouldn't get here")
}
}
}