WIP kryo-pool

This commit is contained in:
nathan 2020-09-09 19:45:57 +02:00
parent 594019020b
commit ac874198c2
3 changed files with 32 additions and 7 deletions

View File

@ -347,7 +347,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
/////////////// ///////////////
// we setup our kryo information once we connect to a server (using the server's kryo registration details) // we setup our kryo information once we connect to a server (using the server's kryo registration details)
if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails)) { if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails, actionDispatch)) {
handshakeConnection.close() handshakeConnection.close()
// because we are getting the class registration details from the SERVER, this should never be the case. // because we are getting the class registration details from the SERVER, this should never be the case.

View File

@ -145,7 +145,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount} if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount}
// we are done with initial configuration, now finish serialization // we are done with initial configuration, now finish serialization
serialization.finishInit(type, settingsStore) serialization.finishInit(type, settingsStore, ByteArray(0), actionDispatch)
} }
override fun newException(message: String, cause: Throwable?): Throwable { override fun newException(message: String, cause: Throwable?): Throwable {
@ -368,7 +368,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// val port = remoteIpAndPort.substring(splitPoint+1) // val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron // this should never be null, because we are feeding it a valid IP address from aeron
// maybe IPv4, maybe IPv6!!! // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is.
val clientAddress = IP.getByName(clientAddressString)!! val clientAddress = IP.getByName(clientAddressString)!!

View File

@ -125,6 +125,10 @@ open class Serialization(private val references: Boolean = true, private val fac
// The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) // The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem)
// NOTE: These following can ONLY be called on a single thread! // NOTE: These following can ONLY be called on a single thread!
private var readKryo = initGlobalKryo() private var readKryo = initGlobalKryo()
// private val kryoPoolSize = 16384
// private val kryoPool = Channel<KryoExtra>(kryoPoolSize)
private val kryoPool = MultithreadConcurrentQueue<KryoExtra>(1024) // reasonable size of available kryo's? private val kryoPool = MultithreadConcurrentQueue<KryoExtra>(1024) // reasonable size of available kryo's?
@ -390,11 +394,14 @@ open class Serialization(private val references: Boolean = true, private val fac
* *
* This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown. * This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown.
*/ */
internal fun finishInit(type: Class<*>, settingsStore: SettingsStore, kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean { internal fun finishInit(type: Class<*>,
settingsStore: SettingsStore,
kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0),
actionDispatch: CoroutineScope): Boolean {
logger = KotlinLogging.logger(type.simpleName) logger = KotlinLogging.logger(type.simpleName)
// this will set up the class registration information // this will set up the class registration information
return if (type == Server::class.java) { val success = if (type == Server::class.java) {
if (!initialized.compareAndSet(expect = false, update = true)) { if (!initialized.compareAndSet(expect = false, update = true)) {
logger.error("Unable to initialize serialization more than once!") logger.error("Unable to initialize serialization more than once!")
return false return false
@ -427,6 +434,22 @@ open class Serialization(private val references: Boolean = true, private val fac
val kryo = initKryo() // this will initialize the class registrations val kryo = initKryo() // this will initialize the class registrations
initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo) initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo)
} }
// if (success) {
// // now we want to populate our kryopool. It doesn't have to be on the same thread
// actionDispatch.launch {
//// runBlocking {
// for (i in 0 until kryoHandshakePoolSize) {
// kryoHandshakePool.send(initHandshakeKryo())
// }
//
// for (i in 0 until kryoPoolSize) {
// kryoPool.send(initKryo())
// }
// }
// }
return success
} }
private fun initializeClassRegistrations(kryo: KryoExtra): Boolean { private fun initializeClassRegistrations(kryo: KryoExtra): Boolean {
@ -628,16 +651,18 @@ open class Serialization(private val references: Boolean = true, private val fac
/** /**
* @return takes a kryo instance from the pool, or creates one if the pool was empty * @return takes a kryo instance from the pool, or creates one if the pool was empty
*/ */
fun takeKryo(): KryoExtra { suspend fun takeKryo(): KryoExtra {
// ALWAYS get as many as needed. Recycle them to prevent too many getting created // ALWAYS get as many as needed. Recycle them to prevent too many getting created
// return kryoPool.receive()
return kryoPool.poll() ?: initKryo() return kryoPool.poll() ?: initKryo()
} }
/** /**
* Returns a kryo instance to the pool for re-use later on * Returns a kryo instance to the pool for re-use later on
*/ */
fun returnKryo(kryo: KryoExtra) { suspend fun returnKryo(kryo: KryoExtra) {
// return as much as we can. don't suspend if the pool is full, we just throw it away. // return as much as we can. don't suspend if the pool is full, we just throw it away.
// kryoPool.send(kryo)
kryoPool.offer(kryo) kryoPool.offer(kryo)
} }