KryoPool now will automatically grow if too many are read before they are returned
This commit is contained in:
parent
9c34b412b1
commit
49badc0836
|
@ -361,7 +361,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
|
||||||
///////////////
|
///////////////
|
||||||
|
|
||||||
// we setup our kryo information once we connect to a server (using the server's kryo registration details)
|
// we setup our kryo information once we connect to a server (using the server's kryo registration details)
|
||||||
if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails, actionDispatch)) {
|
if (!serialization.finishInit(type, settingsStore, connectionInfo.kryoRegistrationDetails)) {
|
||||||
handshakeConnection.close()
|
handshakeConnection.close()
|
||||||
|
|
||||||
// because we are getting the class registration details from the SERVER, this should never be the case.
|
// because we are getting the class registration details from the SERVER, this should never be the case.
|
||||||
|
|
|
@ -169,7 +169,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount}
|
if (config.maxConnectionsPerIpAddress == 0) { config.maxConnectionsPerIpAddress = config.maxClientCount}
|
||||||
|
|
||||||
// we are done with initial configuration, now finish serialization
|
// we are done with initial configuration, now finish serialization
|
||||||
serialization.finishInit(type, settingsStore, ByteArray(0), actionDispatch)
|
serialization.finishInit(type, settingsStore, ByteArray(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun newException(message: String, cause: Throwable?): Throwable {
|
override fun newException(message: String, cause: Throwable?): Throwable {
|
||||||
|
|
|
@ -44,7 +44,6 @@ import dorkbox.os.OS
|
||||||
import dorkbox.util.serialization.SerializationDefaults
|
import dorkbox.util.serialization.SerializationDefaults
|
||||||
import dorkbox.util.serialization.SerializationManager
|
import dorkbox.util.serialization.SerializationManager
|
||||||
import kotlinx.atomicfu.atomic
|
import kotlinx.atomicfu.atomic
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import mu.KLogger
|
import mu.KLogger
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
|
@ -126,10 +125,12 @@ open class Serialization(private val references: Boolean = true, private val fac
|
||||||
// NOTE: These following can ONLY be called on a single thread!
|
// NOTE: These following can ONLY be called on a single thread!
|
||||||
private var readKryo = initGlobalKryo()
|
private var readKryo = initGlobalKryo()
|
||||||
|
|
||||||
// private val kryoPoolSize = 16384
|
private var kryoPoolSize = 16
|
||||||
// private val kryoPool = Channel<KryoExtra>(kryoPoolSize)
|
|
||||||
|
|
||||||
private val kryoPool = MultithreadConcurrentQueue<KryoExtra>(1024) // reasonable size of available kryo's?
|
val kryoInUse = atomic(0)
|
||||||
|
|
||||||
|
@Volatile
|
||||||
|
private var kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -396,12 +397,12 @@ open class Serialization(private val references: Boolean = true, private val fac
|
||||||
*/
|
*/
|
||||||
internal fun finishInit(type: Class<*>,
|
internal fun finishInit(type: Class<*>,
|
||||||
settingsStore: SettingsStore,
|
settingsStore: SettingsStore,
|
||||||
kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0),
|
kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean {
|
||||||
actionDispatch: CoroutineScope): Boolean {
|
|
||||||
logger = KotlinLogging.logger(type.simpleName)
|
logger = KotlinLogging.logger(type.simpleName)
|
||||||
|
|
||||||
// this will set up the class registration information
|
// this will set up the class registration information
|
||||||
val success = if (type == Server::class.java) {
|
return if (type == Server::class.java) {
|
||||||
if (!initialized.compareAndSet(expect = false, update = true)) {
|
if (!initialized.compareAndSet(expect = false, update = true)) {
|
||||||
logger.error("Unable to initialize serialization more than once!")
|
logger.error("Unable to initialize serialization more than once!")
|
||||||
return false
|
return false
|
||||||
|
@ -434,22 +435,6 @@ open class Serialization(private val references: Boolean = true, private val fac
|
||||||
val kryo = initKryo() // this will initialize the class registrations
|
val kryo = initKryo() // this will initialize the class registrations
|
||||||
initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo)
|
initializeClient(kryoRegistrationDetailsFromServer, classesToRegisterForRmi, kryo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (success) {
|
|
||||||
// // now we want to populate our kryopool. It doesn't have to be on the same thread
|
|
||||||
// actionDispatch.launch {
|
|
||||||
//// runBlocking {
|
|
||||||
// for (i in 0 until kryoHandshakePoolSize) {
|
|
||||||
// kryoHandshakePool.send(initHandshakeKryo())
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for (i in 0 until kryoPoolSize) {
|
|
||||||
// kryoPool.send(initKryo())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
return success
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun initializeClassRegistrations(kryo: KryoExtra): Boolean {
|
private fun initializeClassRegistrations(kryo: KryoExtra): Boolean {
|
||||||
|
@ -648,21 +633,56 @@ open class Serialization(private val references: Boolean = true, private val fac
|
||||||
return initializedKryoCount.value
|
return initializedKryoCount.value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The number of kryo instances in use.
|
||||||
|
*/
|
||||||
|
fun getInUseKryoCount(): Int {
|
||||||
|
return kryoInUse.value
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return takes a kryo instance from the pool, or creates one if the pool was empty
|
* @return takes a kryo instance from the pool, or creates one if the pool was empty
|
||||||
*/
|
*/
|
||||||
suspend fun takeKryo(): KryoExtra {
|
fun takeKryo(): KryoExtra {
|
||||||
// ALWAYS get as many as needed. Recycle them to prevent too many getting created
|
kryoInUse.getAndIncrement()
|
||||||
// return kryoPool.receive()
|
|
||||||
|
// ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created
|
||||||
return kryoPool.poll() ?: initKryo()
|
return kryoPool.poll() ?: initKryo()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a kryo instance to the pool for re-use later on
|
* Returns a kryo instance to the pool for re-use later on
|
||||||
*/
|
*/
|
||||||
suspend fun returnKryo(kryo: KryoExtra) {
|
fun returnKryo(kryo: KryoExtra) {
|
||||||
// return as much as we can. don't suspend if the pool is full, we just throw it away.
|
val kryoCount = kryoInUse.getAndDecrement()
|
||||||
// kryoPool.send(kryo)
|
if (kryoCount > kryoPoolSize) {
|
||||||
|
// this is CLEARLY a problem, as we have more kryos in use that our pool can support.
|
||||||
|
// This happens when we send messages REALLY fast.
|
||||||
|
//
|
||||||
|
// We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit)
|
||||||
|
|
||||||
|
synchronized(kryoInUse) {
|
||||||
|
// we have a double check here on purpose. only 1 will work
|
||||||
|
if (kryoCount > kryoPoolSize) {
|
||||||
|
val oldPool = kryoPool
|
||||||
|
val oldSize = kryoPoolSize
|
||||||
|
val newSize = kryoPoolSize * 2
|
||||||
|
|
||||||
|
kryoPoolSize = newSize
|
||||||
|
kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
|
||||||
|
|
||||||
|
|
||||||
|
// take all of the old kryos and put them in the new one
|
||||||
|
val array = arrayOfNulls<KryoExtra>(oldSize)
|
||||||
|
val count = oldPool.remove(array)
|
||||||
|
|
||||||
|
for (i in 0 until count) {
|
||||||
|
kryoPool.offer(array[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
kryoPool.offer(kryo)
|
kryoPool.offer(kryo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user