From ac874198c207a9613ecae6785006ce6ad55c1a4e Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 9 Sep 2020 19:45:57 +0200 Subject: [PATCH] WIP kryo-pool --- src/dorkbox/network/Client.kt | 2 +- src/dorkbox/network/Server.kt | 4 +-- .../network/serialization/Serialization.kt | 33 ++++++++++++++++--- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 8cc39098..039b51f1 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -347,7 +347,7 @@ open class Client(config: Configuration = Configuration /////////////// // we setup our kryo information once we connect to a server (using the server's kryo registration details) - if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails)) { + if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails, actionDispatch)) { handshakeConnection.close() // because we are getting the class registration details from the SERVER, this should never be the case. diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 2babddac..98641f16 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -145,7 +145,7 @@ open class Server(config: ServerConfiguration = ServerC if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount} // we are done with initial configuration, now finish serialization - serialization.finishInit(type, settingsStore) + serialization.finishInit(type, settingsStore, ByteArray(0), actionDispatch) } override fun newException(message: String, cause: Throwable?): Throwable { @@ -368,7 +368,7 @@ open class Server(config: ServerConfiguration = ServerC // val port = remoteIpAndPort.substring(splitPoint+1) // this should never be null, because we are feeding it a valid IP address from aeron - // maybe IPv4, maybe IPv6!!! + // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is. val clientAddress = IP.getByName(clientAddressString)!! diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index eb1abb29..9ed56a74 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -125,6 +125,10 @@ open class Serialization(private val references: Boolean = true, private val fac // The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) // NOTE: These following can ONLY be called on a single thread! private var readKryo = initGlobalKryo() + +// private val kryoPoolSize = 16384 +// private val kryoPool = Channel(kryoPoolSize) + private val kryoPool = MultithreadConcurrentQueue(1024) // reasonable size of available kryo's? @@ -390,11 +394,14 @@ open class Serialization(private val references: Boolean = true, private val fac * * This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown. */ - internal fun finishInit(type: Class<*>, settingsStore: SettingsStore, kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean { + internal fun finishInit(type: Class<*>, + settingsStore: SettingsStore, + kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0), + actionDispatch: CoroutineScope): Boolean { logger = KotlinLogging.logger(type.simpleName) // this will set up the class registration information - return if (type == Server::class.java) { + val success = if (type == Server::class.java) { if (!initialized.compareAndSet(expect = false, update = true)) { logger.error("Unable to initialize serialization more than once!") return false @@ -427,6 +434,22 @@ open class Serialization(private val references: Boolean = true, private val fac val kryo = initKryo() // this will initialize the class registrations initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo) } + +// if (success) { +// // now we want to populate our kryopool. It doesn't have to be on the same thread +// actionDispatch.launch { +//// runBlocking { +// for (i in 0 until kryoHandshakePoolSize) { +// kryoHandshakePool.send(initHandshakeKryo()) +// } +// +// for (i in 0 until kryoPoolSize) { +// kryoPool.send(initKryo()) +// } +// } +// } + + return success } private fun initializeClassRegistrations(kryo: KryoExtra): Boolean { @@ -628,16 +651,18 @@ open class Serialization(private val references: Boolean = true, private val fac /** * @return takes a kryo instance from the pool, or creates one if the pool was empty */ - fun takeKryo(): KryoExtra { + suspend fun takeKryo(): KryoExtra { // ALWAYS get as many as needed. Recycle them to prevent too many getting created +// return kryoPool.receive() return kryoPool.poll() ?: initKryo() } /** * Returns a kryo instance to the pool for re-use later on */ - fun returnKryo(kryo: KryoExtra) { + suspend fun returnKryo(kryo: KryoExtra) { // return as much as we can. don't suspend if the pool is full, we just throw it away. +// kryoPool.send(kryo) kryoPool.offer(kryo) }