diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 32d38c7c..84475659 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -361,7 +361,7 @@ open class Client(config: Configuration = Configuration /////////////// // we setup our kryo information once we connect to a server (using the server's kryo registration details) - if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails, actionDispatch)) { + if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails)) { handshakeConnection.close() // because we are getting the class registration details from the SERVER, this should never be the case. diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 83c72162..63265c4c 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -169,7 +169,7 @@ open class Server(config: ServerConfiguration = ServerC if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount} // we are done with initial configuration, now finish serialization - serialization.finishInit(type, settingsStore, ByteArray(0), actionDispatch) + serialization.finishInit(type, settingsStore, ByteArray(0)) } override fun newException(message: String, cause: Throwable?): Throwable { diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 9ed56a74..b7202cf3 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -44,7 +44,6 @@ import dorkbox.os.OS import dorkbox.util.serialization.SerializationDefaults import dorkbox.util.serialization.SerializationManager import kotlinx.atomicfu.atomic -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.runBlocking import mu.KLogger import mu.KotlinLogging @@ -126,10 +125,12 @@ open class Serialization(private val references: Boolean = true, private val fac // NOTE: These following can ONLY be called on a single thread! private var readKryo = initGlobalKryo() -// private val kryoPoolSize = 16384 -// private val kryoPool = Channel(kryoPoolSize) + private var kryoPoolSize = 16 - private val kryoPool = MultithreadConcurrentQueue(1024) // reasonable size of available kryo's? + val kryoInUse = atomic(0) + + @Volatile + private var kryoPool = MultithreadConcurrentQueue(kryoPoolSize) /** @@ -396,12 +397,12 @@ open class Serialization(private val references: Boolean = true, private val fac */ internal fun finishInit(type: Class<*>, settingsStore: SettingsStore, - kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0), - actionDispatch: CoroutineScope): Boolean { + kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean { + logger = KotlinLogging.logger(type.simpleName) // this will set up the class registration information - val success = if (type == Server::class.java) { + return if (type == Server::class.java) { if (!initialized.compareAndSet(expect = false, update = true)) { logger.error("Unable to initialize serialization more than once!") return false @@ -434,22 +435,6 @@ open class Serialization(private val references: Boolean = true, private val fac val kryo = initKryo() // this will initialize the class registrations initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo) } - -// if (success) { -// // now we want to populate our kryopool. It doesn't have to be on the same thread -// actionDispatch.launch { -//// runBlocking { -// for (i in 0 until kryoHandshakePoolSize) { -// kryoHandshakePool.send(initHandshakeKryo()) -// } -// -// for (i in 0 until kryoPoolSize) { -// kryoPool.send(initKryo()) -// } -// } -// } - - return success } private fun initializeClassRegistrations(kryo: KryoExtra): Boolean { @@ -648,21 +633,56 @@ open class Serialization(private val references: Boolean = true, private val fac return initializedKryoCount.value } + /** + * @return The number of kryo instances in use. + */ + fun getInUseKryoCount(): Int { + return kryoInUse.value + } + /** * @return takes a kryo instance from the pool, or creates one if the pool was empty */ - suspend fun takeKryo(): KryoExtra { - // ALWAYS get as many as needed. Recycle them to prevent too many getting created -// return kryoPool.receive() + fun takeKryo(): KryoExtra { + kryoInUse.getAndIncrement() + + // ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created return kryoPool.poll() ?: initKryo() } /** * Returns a kryo instance to the pool for re-use later on */ - suspend fun returnKryo(kryo: KryoExtra) { - // return as much as we can. don't suspend if the pool is full, we just throw it away. -// kryoPool.send(kryo) + fun returnKryo(kryo: KryoExtra) { + val kryoCount = kryoInUse.getAndDecrement() + if (kryoCount > kryoPoolSize) { + // this is CLEARLY a problem, as we have more kryos in use that our pool can support. + // This happens when we send messages REALLY fast. + // + // We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit) + + synchronized(kryoInUse) { + // we have a double check here on purpose. only 1 will work + if (kryoCount > kryoPoolSize) { + val oldPool = kryoPool + val oldSize = kryoPoolSize + val newSize = kryoPoolSize * 2 + + kryoPoolSize = newSize + kryoPool = MultithreadConcurrentQueue(kryoPoolSize) + + + // take all of the old kryos and put them in the new one + val array = arrayOfNulls(oldSize) + val count = oldPool.remove(array) + + for (i in 0 until count) { + kryoPool.offer(array[i]) + } + } + } + } + kryoPool.offer(kryo) }