Updated to use the object pool. Release 5.0!

This commit is contained in:
nathan 2020-10-06 02:08:12 +02:00
parent f716b72eca
commit 96b4a78104
6 changed files with 60 additions and 72 deletions

33
LICENSE
View File

@ -579,3 +579,36 @@
[Public Domain, per Creative Commons CC0] [Public Domain, per Creative Commons CC0]
https://github.com/str4d/ed25519-java https://github.com/str4d/ed25519-java
https://github.com/str4d https://github.com/str4d
- ObjectPool - Fast, lightweight, and compatible blocking/non-blocking/soft-reference object pool for Java 6+
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/ObjectPool
Copyright 2020
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support
[The Apache Software License, Version 2.0]
https://github.com/Kotlin/kotlinx.coroutines
Copyright 2020
JetBrains s.r.o.
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2020
QOS.ch
- Conversant Disruptor - Disruptor is the highest performing intra-thread transfer mechanism available in Java.
[The Apache Software License, Version 2.0]
https://github.com/conversant/disruptor
Copyright 2020
Conversant, Inc

View File

@ -42,7 +42,7 @@ object Extras {
// set for the project // set for the project
const val description = "Encrypted, high-performance, and event-driven/reactive network stack for Java 11+" const val description = "Encrypted, high-performance, and event-driven/reactive network stack for Java 11+"
const val group = "com.dorkbox" const val group = "com.dorkbox"
const val version = "5.0-beta7" const val version = "5.0"
// set as project.ext // set as project.ext
const val name = "Network" const val name = "Network"
@ -71,6 +71,8 @@ GradleUtils.compileConfiguration(JavaVersion.VERSION_11) { kotlinOptions ->
// ratelimiter, "other" package // ratelimiter, "other" package
// ping, rest of unit tests // ping, rest of unit tests
// getConnectionUpgradeType // getConnectionUpgradeType
// ability to send with a function callback (using RMI waiter type stuff for callbacks)
// use conscrypt?!
// java 14 is faster with aeron! // java 14 is faster with aeron!
// NOTE: now using aeron instead of netty // NOTE: now using aeron instead of netty
@ -213,13 +215,13 @@ dependencies {
implementation("de.javakaffee:kryo-serializers:0.45") implementation("de.javakaffee:kryo-serializers:0.45")
// https://github.com/jpountz/lz4-java // https://github.com/jpountz/lz4-java
implementation("net.jpountz.lz4:lz4:1.3.0") // implementation("net.jpountz.lz4:lz4:1.3.0")
// this is NOT the same thing as LMAX disruptor. // this is NOT the same thing as LMAX disruptor.
// This is just a really fast queue (where LMAX is a fast queue + other things w/ a difficult DSL) // This is just a really fast queue (where LMAX is a fast queue + other things w/ a difficult DSL)
// https://github.com/conversant/disruptor_benchmark // https://github.com/conversant/disruptor_benchmark
// https://www.youtube.com/watch?v=jVMOgQgYzWU // https://www.youtube.com/watch?v=jVMOgQgYzWU
implementation("com.conversantmedia:disruptor:1.2.17") implementation("com.conversantmedia:disruptor:1.2.19")
// https://github.com/jhalterman/typetools // https://github.com/jhalterman/typetools
implementation("net.jodah:typetools:0.6.2") implementation("net.jodah:typetools:0.6.2")
@ -227,14 +229,15 @@ dependencies {
// https://github.com/dorkbox // https://github.com/dorkbox
implementation("com.dorkbox:Annotations:3.1") implementation("com.dorkbox:Annotations:3.1")
implementation("com.dorkbox:MinLog-SLF4J:2.0") implementation("com.dorkbox:MinLog-SLF4J:2.0")
implementation("com.dorkbox:Utilities:1.8.2") implementation("com.dorkbox:Utilities:1.8.3")
implementation("com.dorkbox:NetworkUtils:2.0") implementation("com.dorkbox:NetworkUtils:2.0")
implementation("com.dorkbox:ObjectPool:3.0")
// really fast storage // really fast storage
// https://github.com/lmdbjava/lmdbjava // https://github.com/lmdbjava/lmdbjava
compileOnly("org.lmdbjava:lmdbjava:0.8.1") compileOnly("org.lmdbjava:lmdbjava:0.8.1")
// https://github.com/OpenHFT/Chronicle-Map // https://github.com/OpenHFT/Chronicle-Map
compileOnly("net.openhft:chronicle-map:3.20.3") compileOnly("net.openhft:chronicle-map:3.20.40")
// Caffeine High-throughput Timeout Cache // Caffeine High-throughput Timeout Cache
@ -249,6 +252,8 @@ dependencies {
implementation("org.slf4j:slf4j-api:1.7.30") implementation("org.slf4j:slf4j-api:1.7.30")
testImplementation("org.lmdbjava:lmdbjava:0.8.1")
testImplementation("net.openhft:chronicle-map:3.20.3")
testImplementation("junit:junit:4.13") testImplementation("junit:junit:4.13")
testImplementation("ch.qos.logback:logback-classic:1.2.3") testImplementation("ch.qos.logback:logback-classic:1.2.3")

View File

@ -15,7 +15,6 @@
*/ */
package dorkbox.network.serialization package dorkbox.network.serialization
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory import com.esotericsoftware.kryo.SerializerFactory
@ -37,6 +36,9 @@ import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.rmi.messages.MethodResponseSerializer import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.RmiClientSerializer import dorkbox.network.rmi.messages.RmiClientSerializer
import dorkbox.network.rmi.messages.RmiServerSerializer import dorkbox.network.rmi.messages.RmiServerSerializer
import dorkbox.objectPool.Pool
import dorkbox.objectPool.ObjectPool
import dorkbox.objectPool.PoolObject
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.util.serialization.SerializationDefaults import dorkbox.util.serialization.SerializationDefaults
import kotlinx.atomicfu.atomic import kotlinx.atomicfu.atomic
@ -85,7 +87,6 @@ open class Serialization(private val references: Boolean = true, private val fac
private lateinit var logger: KLogger private lateinit var logger: KLogger
private var initialized = atomic(false) private var initialized = atomic(false)
private val initializedKryoCount = atomic(0)
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class)
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
@ -118,12 +119,17 @@ open class Serialization(private val references: Boolean = true, private val fac
// NOTE: These following can ONLY be called on a single thread! // NOTE: These following can ONLY be called on a single thread!
private var readKryo = initGlobalKryo() private var readKryo = initGlobalKryo()
private var kryoPoolSize = 16 private val kryoPool: Pool<KryoExtra>
val kryoInUse = atomic(0) init {
val poolObject = object : PoolObject<KryoExtra>() {
override fun newInstance(): KryoExtra {
return initKryo()
}
}
@Volatile kryoPool = ObjectPool.nonBlocking(poolObject)
private var kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize) }
/** /**
@ -334,7 +340,6 @@ open class Serialization(private val references: Boolean = true, private val fac
* called as the first thing inside when initializing the classesToRegister * called as the first thing inside when initializing the classesToRegister
*/ */
private fun initKryo(): KryoExtra { private fun initKryo(): KryoExtra {
initializedKryoCount.getAndIncrement()
val kryo = KryoExtra() val kryo = KryoExtra()
kryo.instantiatorStrategy = instantiatorStrategy kryo.instantiatorStrategy = instantiatorStrategy
@ -610,67 +615,18 @@ open class Serialization(private val references: Boolean = true, private val fac
return initializeClassRegistrations(kryo) return initializeClassRegistrations(kryo)
} }
/**
* @return The number of kryo instances created. This does not reflect the size of the pool, just the number of
* existing kryo instances.
*
* If there are more kryo instances than the pool size, they will end up on the heap and will contribute to GC pauses.
*/
fun getInitializedKryoCount(): Int {
return initializedKryoCount.value
}
/**
* @return The number of kryo instances in use.
*/
fun getInUseKryoCount(): Int {
return kryoInUse.value
}
/** /**
* @return takes a kryo instance from the pool, or creates one if the pool was empty * @return takes a kryo instance from the pool, or creates one if the pool was empty
*/ */
fun takeKryo(): KryoExtra { fun takeKryo(): KryoExtra {
kryoInUse.getAndIncrement() return kryoPool.take()
// ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created
return kryoPool.poll() ?: initKryo()
} }
/** /**
* Returns a kryo instance to the pool for re-use later on * Returns a kryo instance to the pool for re-use later on
*/ */
fun returnKryo(kryo: KryoExtra) { fun returnKryo(kryo: KryoExtra) {
val kryoCount = kryoInUse.getAndDecrement() kryoPool.put(kryo)
if (kryoCount > kryoPoolSize) {
// this is CLEARLY a problem, as we have more kryos in use that our pool can support.
// This happens when we send messages REALLY fast.
//
// We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit)
synchronized(kryoInUse) {
// we have a double check here on purpose. only 1 will work
if (kryoCount > kryoPoolSize) {
val oldPool = kryoPool
val oldSize = kryoPoolSize
val newSize = kryoPoolSize * 2
kryoPoolSize = newSize
kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
// take all of the old kryos and put them in the new one
val array = arrayOfNulls<KryoExtra>(oldSize)
val count = oldPool.remove(array)
for (i in 0 until count) {
kryoPool.offer(array[i])
}
}
}
}
kryoPool.offer(kryo)
} }
/** /**

View File

@ -119,9 +119,7 @@ class RmiSpamAsyncTest : BaseTest() {
} }
waitForThreads() waitForThreads()
Assert.assertEquals(totalRuns.toLong(), counter.get()) Assert.assertEquals(totalRuns, counter.get())
client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}")
server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}")
} }
private interface TestObject { private interface TestObject {

View File

@ -119,9 +119,7 @@ class RmiSpamSyncSuspendingTest : BaseTest() {
} }
waitForThreads() waitForThreads()
Assert.assertEquals(totalRuns.toLong(), counter.get()) Assert.assertEquals(totalRuns, counter.get())
client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}")
server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}")
} }
private interface TestObject { private interface TestObject {

View File

@ -120,9 +120,7 @@ class RmiSpamSyncTest : BaseTest() {
} }
waitForThreads() waitForThreads()
Assert.assertEquals(totalRuns.toLong(), counter.get()) Assert.assertEquals(totalRuns, counter.get())
client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}")
server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}")
} }
private interface TestObject { private interface TestObject {