diff --git a/LICENSE b/LICENSE index fbc79fe3..e58d6fa4 100644 --- a/LICENSE +++ b/LICENSE @@ -579,3 +579,36 @@ [Public Domain, per Creative Commons CC0] https://github.com/str4d/ed25519-java https://github.com/str4d + + - ObjectPool - Fast, lightweight, and compatible blocking/non-blocking/soft-reference object pool for Java 6+ + [The Apache Software License, Version 2.0] + https://git.dorkbox.com/dorkbox/ObjectPool + Copyright 2020 + Dorkbox LLC + + Extra license information + - Kotlin - + [The Apache Software License, Version 2.0] + https://github.com/JetBrains/kotlin + Copyright 2020 + JetBrains s.r.o. and Kotlin Programming Language contributors + Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply + See: https://github.com/JetBrains/kotlin/blob/master/license/README.md + + - kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support + [The Apache Software License, Version 2.0] + https://github.com/Kotlin/kotlinx.coroutines + Copyright 2020 + JetBrains s.r.o. + + - SLF4J - Simple facade or abstraction for various logging frameworks + [MIT License] + http://www.slf4j.org + Copyright 2020 + QOS.ch + + - Conversant Disruptor - Disruptor is the highest performing intra-thread transfer mechanism available in Java. + [The Apache Software License, Version 2.0] + https://github.com/conversant/disruptor + Copyright 2020 + Conversant, Inc diff --git a/build.gradle.kts b/build.gradle.kts index 5fd7ad70..a4b0e53a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -42,7 +42,7 @@ object Extras { // set for the project const val description = "Encrypted, high-performance, and event-driven/reactive network stack for Java 11+" const val group = "com.dorkbox" - const val version = "5.0-beta7" + const val version = "5.0" // set as project.ext const val name = "Network" @@ -71,6 +71,8 @@ GradleUtils.compileConfiguration(JavaVersion.VERSION_11) { kotlinOptions -> // ratelimiter, "other" package // ping, rest of unit tests // getConnectionUpgradeType +// ability to send with a function callback (using RMI waiter type stuff for callbacks) +// use conscrypt?! // java 14 is faster with aeron! // NOTE: now using aeron instead of netty @@ -213,13 +215,13 @@ dependencies { implementation("de.javakaffee:kryo-serializers:0.45") // https://github.com/jpountz/lz4-java - implementation("net.jpountz.lz4:lz4:1.3.0") +// implementation("net.jpountz.lz4:lz4:1.3.0") // this is NOT the same thing as LMAX disruptor. // This is just a really fast queue (where LMAX is a fast queue + other things w/ a difficult DSL) // https://github.com/conversant/disruptor_benchmark // https://www.youtube.com/watch?v=jVMOgQgYzWU - implementation("com.conversantmedia:disruptor:1.2.17") + implementation("com.conversantmedia:disruptor:1.2.19") // https://github.com/jhalterman/typetools implementation("net.jodah:typetools:0.6.2") @@ -227,14 +229,15 @@ dependencies { // https://github.com/dorkbox implementation("com.dorkbox:Annotations:3.1") implementation("com.dorkbox:MinLog-SLF4J:2.0") - implementation("com.dorkbox:Utilities:1.8.2") + implementation("com.dorkbox:Utilities:1.8.3") implementation("com.dorkbox:NetworkUtils:2.0") + implementation("com.dorkbox:ObjectPool:3.0") // really fast storage // https://github.com/lmdbjava/lmdbjava compileOnly("org.lmdbjava:lmdbjava:0.8.1") // https://github.com/OpenHFT/Chronicle-Map - compileOnly("net.openhft:chronicle-map:3.20.3") + compileOnly("net.openhft:chronicle-map:3.20.40") // Caffeine High-throughput Timeout Cache @@ -249,6 +252,8 @@ dependencies { implementation("org.slf4j:slf4j-api:1.7.30") + testImplementation("org.lmdbjava:lmdbjava:0.8.1") + testImplementation("net.openhft:chronicle-map:3.20.3") testImplementation("junit:junit:4.13") testImplementation("ch.qos.logback:logback-classic:1.2.3") diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index c60ac992..e4ba1bda 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -15,7 +15,6 @@ */ package dorkbox.network.serialization -import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.SerializerFactory @@ -37,6 +36,9 @@ import dorkbox.network.rmi.messages.MethodResponse import dorkbox.network.rmi.messages.MethodResponseSerializer import dorkbox.network.rmi.messages.RmiClientSerializer import dorkbox.network.rmi.messages.RmiServerSerializer +import dorkbox.objectPool.Pool +import dorkbox.objectPool.ObjectPool +import dorkbox.objectPool.PoolObject import dorkbox.os.OS import dorkbox.util.serialization.SerializationDefaults import kotlinx.atomicfu.atomic @@ -85,7 +87,6 @@ open class Serialization(private val references: Boolean = true, private val fac private lateinit var logger: KLogger private var initialized = atomic(false) - private val initializedKryoCount = atomic(0) // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. @@ -118,12 +119,17 @@ open class Serialization(private val references: Boolean = true, private val fac // NOTE: These following can ONLY be called on a single thread! private var readKryo = initGlobalKryo() - private var kryoPoolSize = 16 + private val kryoPool: Pool - val kryoInUse = atomic(0) + init { + val poolObject = object : PoolObject() { + override fun newInstance(): KryoExtra { + return initKryo() + } + } - @Volatile - private var kryoPool = MultithreadConcurrentQueue(kryoPoolSize) + kryoPool = ObjectPool.nonBlocking(poolObject) + } /** @@ -334,7 +340,6 @@ open class Serialization(private val references: Boolean = true, private val fac * called as the first thing inside when initializing the classesToRegister */ private fun initKryo(): KryoExtra { - initializedKryoCount.getAndIncrement() val kryo = KryoExtra() kryo.instantiatorStrategy = instantiatorStrategy @@ -610,67 +615,18 @@ open class Serialization(private val references: Boolean = true, private val fac return initializeClassRegistrations(kryo) } - /** - * @return The number of kryo instances created. This does not reflect the size of the pool, just the number of - * existing kryo instances. - * - * If there are more kryo instances than the pool size, they will end up on the heap and will contribute to GC pauses. - */ - fun getInitializedKryoCount(): Int { - return initializedKryoCount.value - } - - /** - * @return The number of kryo instances in use. - */ - fun getInUseKryoCount(): Int { - return kryoInUse.value - } - /** * @return takes a kryo instance from the pool, or creates one if the pool was empty */ fun takeKryo(): KryoExtra { - kryoInUse.getAndIncrement() - - // ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created - return kryoPool.poll() ?: initKryo() + return kryoPool.take() } /** * Returns a kryo instance to the pool for re-use later on */ fun returnKryo(kryo: KryoExtra) { - val kryoCount = kryoInUse.getAndDecrement() - if (kryoCount > kryoPoolSize) { - // this is CLEARLY a problem, as we have more kryos in use that our pool can support. - // This happens when we send messages REALLY fast. - // - // We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit) - - synchronized(kryoInUse) { - // we have a double check here on purpose. only 1 will work - if (kryoCount > kryoPoolSize) { - val oldPool = kryoPool - val oldSize = kryoPoolSize - val newSize = kryoPoolSize * 2 - - kryoPoolSize = newSize - kryoPool = MultithreadConcurrentQueue(kryoPoolSize) - - - // take all of the old kryos and put them in the new one - val array = arrayOfNulls(oldSize) - val count = oldPool.remove(array) - - for (i in 0 until count) { - kryoPool.offer(array[i]) - } - } - } - } - - kryoPool.offer(kryo) + kryoPool.put(kryo) } /** diff --git a/test/dorkboxTest/network/rmi/RmiSpamAsyncTest.kt b/test/dorkboxTest/network/rmi/RmiSpamAsyncTest.kt index d53d044d..da8902f6 100644 --- a/test/dorkboxTest/network/rmi/RmiSpamAsyncTest.kt +++ b/test/dorkboxTest/network/rmi/RmiSpamAsyncTest.kt @@ -119,9 +119,7 @@ class RmiSpamAsyncTest : BaseTest() { } waitForThreads() - Assert.assertEquals(totalRuns.toLong(), counter.get()) - client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}") - server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}") + Assert.assertEquals(totalRuns, counter.get()) } private interface TestObject { diff --git a/test/dorkboxTest/network/rmi/RmiSpamSyncSuspendingTest.kt b/test/dorkboxTest/network/rmi/RmiSpamSyncSuspendingTest.kt index 687f4775..1ff20b4a 100644 --- a/test/dorkboxTest/network/rmi/RmiSpamSyncSuspendingTest.kt +++ b/test/dorkboxTest/network/rmi/RmiSpamSyncSuspendingTest.kt @@ -119,9 +119,7 @@ class RmiSpamSyncSuspendingTest : BaseTest() { } waitForThreads() - Assert.assertEquals(totalRuns.toLong(), counter.get()) - client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}") - server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}") + Assert.assertEquals(totalRuns, counter.get()) } private interface TestObject { diff --git a/test/dorkboxTest/network/rmi/RmiSpamSyncTest.kt b/test/dorkboxTest/network/rmi/RmiSpamSyncTest.kt index bada20d2..8450bb5e 100644 --- a/test/dorkboxTest/network/rmi/RmiSpamSyncTest.kt +++ b/test/dorkboxTest/network/rmi/RmiSpamSyncTest.kt @@ -120,9 +120,7 @@ class RmiSpamSyncTest : BaseTest() { } waitForThreads() - Assert.assertEquals(totalRuns.toLong(), counter.get()) - client.logger.error("kryos generated: ${client.config.serialization.getInitializedKryoCount()}") - server.logger.error("kryos generated: ${server.config.serialization.getInitializedKryoCount()}") + Assert.assertEquals(totalRuns, counter.get()) } private interface TestObject {