Fixed threading issue with read/write global kryo

This commit is contained in:
nathan 2020-09-01 14:38:32 +02:00
parent 0cf510fd7c
commit 9a4c79e445

View File

@ -98,7 +98,7 @@ open class Serialization(private val references: Boolean = true, private val fac
// This is a GLOBAL, single threaded only kryo instance.
// This is to make sure that we have an instance of class registration done correctly and (if not) we are
// are notified on the initial thread (instead of on the network update thread)
private val globalKryo: KryoExtra by lazy { initKryo() }
private val readKryo: KryoExtra by lazy { initKryo() }
// BY DEFAULT, DefaultInstantiatorStrategy() will use ReflectASM
// StdInstantiatorStrategy will create classes bypasses the constructor (which can be useful in some cases) THIS IS A FALLBACK!
@ -323,7 +323,7 @@ open class Serialization(private val references: Boolean = true, private val fac
}
// this will set up the class registration information
initKryo()
val kryo = initKryo()
// now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations based on ID
// in order to get the ID's, these have to be registered with a kryo instance!
@ -393,12 +393,12 @@ open class Serialization(private val references: Boolean = true, private val fac
// RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, implClass, kryoId)
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.clazz, implClass, kryoId)
// we ALSO have to cache the instantiator for these, since these are used to create remote objects
@Suppress("UNCHECKED_CAST")
rmiHolder.idToInstantiator[kryoId] =
globalKryo.instantiatorStrategy.newInstantiatorOf(implClass) as ObjectInstantiator<Any>
kryo.instantiatorStrategy.newInstantiatorOf(implClass) as ObjectInstantiator<Any>
// finally, we must save this ID, to tell the remote connection that their interface serializer must change to support
@ -407,7 +407,7 @@ open class Serialization(private val references: Boolean = true, private val fac
} else if (classRegistration.clazz.isInterface) {
// non-RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, null, kryoId)
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.clazz, null, kryoId)
}
if (kryoId > 65000) {
@ -425,7 +425,7 @@ open class Serialization(private val references: Boolean = true, private val fac
// save this as a byte array (so class registration validation during connection handshake is faster)
val output = AeronOutput()
try {
globalKryo.writeCompressed(logger, output, registrationDetails.toTypedArray())
kryo.writeCompressed(logger, output, registrationDetails.toTypedArray())
} catch (e: Exception) {
logger.error("Unable to write compressed data for registration details", e)
}
@ -568,9 +568,9 @@ open class Serialization(private val references: Boolean = true, private val fac
*
* NOTE: the IFACE must already be registered!!
*/
suspend fun <CONNECTION : Connection> updateKryoIdsForRmi(connection: CONNECTION,
rmiModificationIds: IntArray,
onError: suspend (String) -> Unit) {
fun <CONNECTION : Connection> updateKryoIdsForRmi(connection: CONNECTION,
rmiModificationIds: IntArray,
onError: (String) -> Unit) {
val typeName = connection.endPoint.type.simpleName
// store all of the classes + kryo registration IDs
@ -579,13 +579,12 @@ open class Serialization(private val references: Boolean = true, private val fac
if (!existingRmiIds.contains(it)) {
existingRmiIds.add(it)
// have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on
// a single thread.
// NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct
// because they are set on initialization
val registration = globalKryo.getRegistration(it)
val registration = readKryo.getRegistration(it)
val regMessage = "$typeName-side RMI serializer for registration $it -> ${registration.type}"
if (registration.type.isInterface) {
@ -782,14 +781,10 @@ open class Serialization(private val references: Boolean = true, private val fac
// NOTE: These following functions are ONLY called on a single thread!
fun readMessage(buffer: DirectBuffer, offset: Int, length: Int): Any? {
return globalKryo.read(buffer, offset, length)
return readKryo.read(buffer, offset, length)
}
fun readMessage(buffer: DirectBuffer, offset: Int, length: Int, connection: Connection): Any? {
return globalKryo.read(buffer, offset, length, connection)
}
fun writeMessage(message: Any): AeronOutput {
globalKryo.write(message)
return globalKryo.writerBuffer
return readKryo.read(buffer, offset, length, connection)
}
// /**