Added readKryos for streaming

This commit is contained in:
Robinson 2023-09-06 16:17:03 +02:00
parent 464fbadbd1
commit b30b024849
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 27 additions and 11 deletions

View File

@ -145,6 +145,14 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
},
maxSize = OS.optimumNumberOfThreads * 2
)
private val readKryos: Pool<KryoReader<CONNECTION>> = ObjectPool.nonBlockingBounded(
poolObject = object : BoundedPoolObject<KryoReader<CONNECTION>>() {
override fun newInstance(): KryoReader<CONNECTION> {
return newReadKryo()
}
},
maxSize = OS.optimumNumberOfThreads * 2
)
private var initialized = atomic(false)
@ -164,7 +172,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
// StdInstantiatorStrategy will create classes bypasses the constructor (which can be useful in some cases) THIS IS A FALLBACK!
private val instantiatorStrategy = DefaultInstantiatorStrategy(StdInstantiatorStrategy())
private val methodRequestSerializer = MethodRequestSerializer<CONNECTION>(methodCache) // note: the methodCache is configured BEFORE anything reads from it!
private val methodRequestSerializer = MethodRequestSerializer<CONNECTION>(methodCache) // the methodCache is configured BEFORE anything reads from it!
private val methodResponseSerializer = MethodResponseSerializer()
private val continuationSerializer = ContinuationSerializer()
@ -294,9 +302,9 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
}
/**
* NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
* (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the
* server, thus preventing them from trying to probe the server data structures.
* When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
* (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the
* server, thus preventing them from trying to probe the server data structures.
*
* @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo
*/
@ -761,15 +769,23 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
return newRegistrations
}
internal inline fun <T> withKryo(kryoAccess: KryoWriter<CONNECTION>.() -> T): T {
val kryo = writeKryos.take()
try {
return kryoAccess(kryo)
} finally {
writeKryos.put(kryo)
}
fun take(): KryoWriter<CONNECTION> {
return writeKryos.take()
}
fun put(kryo: KryoWriter<CONNECTION>) {
writeKryos.put(kryo)
}
fun takeRead(): KryoReader<CONNECTION> {
return readKryos.take()
}
fun putRead(kryo: KryoReader<CONNECTION>) {
readKryos.put(kryo)
}
/**
* NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
*