Class registration for RMI can now happen "out of order" and can accept with duplicates (which are automatically ignored)

This commit is contained in:
nathan 2020-08-28 10:19:20 +02:00
parent 47919b9214
commit ffa286c913
7 changed files with 130 additions and 107 deletions

View File

@ -16,15 +16,39 @@
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
import dorkbox.util.collections.IdentityMap
internal interface ClassRegistration {
var id: Int
val clazz: Class<*>
val serializer: Serializer<*>?
internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) {
var info: String = ""
fun register(kryo: KryoExtra)
fun register(kryo: KryoExtra, rmiIfaceToImpl: IdentityMap<Class<*>, Class<*>>) {
// we have to check if this registration ALREADY exists for RMI. If so, we ignore it.
// RMI kryo-registration is SPECIFICALLY for impl object ONLY DURING INITIAL REGISTRATION!
// if the registration is modified, then the registration will be the iface
if (clazz.isInterface) {
val impl = rmiIfaceToImpl[clazz]
if (impl != null && kryo.classResolver.getRegistration(impl)?.serializer is RmiClientReverseSerializer) {
// do nothing, because this is already registered for RMI
info = "Removed RMI conflict registration for class ${clazz.name}"
id = -1
return
}
fun info(): String
} else {
if (kryo.classResolver.getRegistration(clazz)?.serializer is RmiClientReverseSerializer) {
// do nothing, because this is already registered for RMI
info = "Removed RMI conflict registration for class ${clazz.name}"
id = -1
return
}
}
fun getInfoArray(): Array<Any>
// otherwise, we are OK to continue to register this
register(kryo)
}
abstract fun register(kryo: KryoExtra)
abstract fun getInfoArray(): Array<Any>
}

View File

@ -17,19 +17,13 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration0(override val clazz: Class<*>,
override val serializer: Serializer<*>) : ClassRegistration {
override var id: Int = 0
internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) {
override fun register(kryo: KryoExtra) {
id = kryo.register(clazz, serializer).id
}
override fun info(): String {
return "Registered $id -> ${clazz.name} using ${serializer?.javaClass?.name}"
info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass?.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, serializer::class.java.name)
return arrayOf(id, clazz.name, serializer!!::class.java.name)
}
}

View File

@ -15,17 +15,10 @@
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration1(override val clazz: Class<*>, override var id: Int) : ClassRegistration {
override val serializer: Serializer<*>? = null
internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz, null, id) {
override fun register(kryo: KryoExtra) {
kryo.register(clazz, id)
}
override fun info(): String {
return "Registered $id -> (specified) ${clazz.name}"
info = "Registered $id -> (specified) ${clazz.name}"
}
override fun getInfoArray(): Array<Any> {

View File

@ -17,16 +17,14 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration2(override val clazz: Class<*>, override val serializer: Serializer<*>, override var id: Int) : ClassRegistration {
internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz, serializer, id) {
override fun register(kryo: KryoExtra) {
kryo.register(clazz, serializer, id)
}
override fun info(): String {
return "Registered $id -> (specified) ${clazz.name} using ${serializer.javaClass.name}"
info = "Registered $id -> (specified) ${clazz.name} using ${serializer!!.javaClass.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, serializer::class.java.name)
return arrayOf(id, clazz.name, serializer!!::class.java.name)
}
}

View File

@ -15,19 +15,11 @@
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal open class ClassRegistration3(override var clazz: Class<*>) : ClassRegistration {
override var id = 0
override val serializer: Serializer<*>? = null
internal open class ClassRegistration3(clazz: Class<*>) : ClassRegistration(clazz) {
override fun register(kryo: KryoExtra) {
val registration = kryo.register(clazz)
id = registration.id
}
override fun info(): String {
return "Registered $id -> ${clazz.name}"
id = kryo.register(clazz).id
info = "Registered $id -> ${clazz.name}"
}
override fun getInfoArray(): Array<Any> {

View File

@ -17,16 +17,38 @@ package dorkbox.network.serialization
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
internal class ClassRegistrationIfaceAndImpl(val ifaceClass: Class<*>,
/**
* This is to manage serializing RMI objects across the wire...
*
* NOTE:
* CLIENT: can never send the iface object, if it's RMI, it will send the java Proxy object instead.
* SERVER: can never send the iface object, it will always send the IMPL object instead (because of how kryo works)
*
* **************************
* NOTE: This works because we TRICK kryo serialization by changing what the kryo ID serializer is on each end of the connection
* **************************
*
* What we do is on the server, REWRITE the kryo ID for the impl so that it will send just the rmi ID instead of the object
* on the client, this SAME kryo ID must have this serializer as well, so the proxy object is re-assembled.
*
* Kryo serialization works by inspecting the field VALUE type, not the field DEFINED type... So if you send an actual object, you must
* register specifically for the implementation object.
*
*
* To recap:
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/
internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>,
val implClass: Class<*>,
override val serializer: RmiClientReverseSerializer) : ClassRegistration {
override var id: Int = 0
override val clazz: Class<*> = ifaceClass // this has to match what is defined on the rmi client
serializer: RmiClientReverseSerializer) : ClassRegistration(ifaceClass, serializer) {
override fun register(kryo: KryoExtra) {
// have to get the ID for the interface (if it exists)
val registration = kryo.classResolver.getRegistration(ifaceClass)
val registration = kryo.classResolver.getRegistration(clazz) // this is ifaceClass, and must match what is defined on the rmi client
if (registration != null) {
id = registration.id
@ -36,14 +58,11 @@ internal class ClassRegistrationIfaceAndImpl(val ifaceClass: Class<*>,
// now register the impl class
id = kryo.register(implClass, serializer).id
}
}
override fun info(): String {
return "Registered $id -> (RMI) ${implClass.name}"
info = "Registered $id -> (RMI) ${implClass.name}"
}
override fun getInfoArray(): Array<Any> {
// the info array has to match for the INTERFACE (not the impl!)
return arrayOf(id, ifaceClass.name, serializer::class.java.name)
return arrayOf(id, clazz.name, serializer!!::class.java.name)
}
}

View File

@ -17,15 +17,12 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.ClassResolver
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.util.IdentityMap
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ping.PingMessage
import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils
@ -41,6 +38,7 @@ import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
import dorkbox.network.rmi.messages.RmiClientSerializer
import dorkbox.os.OS
import dorkbox.util.collections.IdentityMap
import dorkbox.util.serialization.SerializationDefaults
import dorkbox.util.serialization.SerializationManager
import kotlinx.coroutines.channels.Channel
@ -78,39 +76,8 @@ import kotlin.coroutines.Continuation
* an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see
* Kryo#newDefaultSerializer(Class)
*/
open class Serialization(private val references: Boolean,
private val factory: SerializerFactory<*>?) : SerializationManager<DirectBuffer> {
companion object {
/**
* Additionally, this serialization manager will register the entire class+interface hierarchy for an object. If you want to specify a
* serialization scheme for a specific class in an objects hierarchy, you must register that first.
*
* @param references If true, each appearance of an object in the graph after the first is stored as an integer ordinal. When set to true,
* [MapReferenceResolver] is used. This enables references to the same object and cyclic graphs to be serialized,
* but typically adds overhead of one byte per object. (should be true)
*
* @param factory Sets the serializer factory to use when no {@link Kryo#addDefaultSerializer(Class, Class) default serializers} match
* an object's type. Default is {@link ReflectionSerializerFactory} with {@link FieldSerializer}. @see
* Kryo#newDefaultSerializer(Class)
*/
fun DEFAULT(references: Boolean = true, factory: SerializerFactory<*>? = null): Serialization {
val serialization = Serialization(references, factory)
serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?!
// TODO: this is for diffie hellmen handshake stuff!
// serialization.register(IESParameters::class.java, IesParametersSerializer())
// serialization.register(IESWithCipherParameters::class.java, IesWithCipherParametersSerializer())
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer())
// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer())
// serialization.register(Message::class.java) // must use full package name!
return serialization
}
}
open class Serialization(private val references: Boolean = true,
private val factory: SerializerFactory<*>? = null) : SerializationManager<DirectBuffer> {
private lateinit var logger: KLogger
@ -122,7 +89,7 @@ open class Serialization(private val references: Boolean,
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
// Object checking is performed during actual registration.
private val classesToRegister = mutableListOf<ClassRegistration>()
internal lateinit var savedKryoIdsForRmi: IntArray
private lateinit var savedKryoIdsForRmi: IntArray
private lateinit var savedRegistrationDetails: ByteArray
/// RMI things
@ -130,7 +97,10 @@ open class Serialization(private val references: Boolean,
private val rmiIfaceToImpl = IdentityMap<Class<*>, Class<*>>()
private val rmiImplToIface = IdentityMap<Class<*>, Class<*>>()
// This is a GLOBAL, single threaded only kryo instance.
// This is to make sure that we have an instance of class registration done correctly and (if not) we are
// are notified on the initial thread (instead of on the network update thread)
private val globalKryo: KryoExtra by lazy { initKryo() }
// BY DEFAULT, DefaultInstantiatorStrategy() will use ReflectASM
@ -143,7 +113,7 @@ open class Serialization(private val references: Boolean,
private val continuationSerializer = ContinuationSerializer()
private val rmiClientSerializer = RmiClientSerializer()
private val rmiClientReverseSerializer = RmiClientReverseSerializer(rmiImplToIface)
private val rmiClientReverseSerializer = RmiClientReverseSerializer()
// list of already seen client RMI ids (which the server might not have registered as RMI types).
private var existingRmiIds = CopyOnWriteArrayList<Int>()
@ -162,7 +132,6 @@ open class Serialization(private val references: Boolean,
val KRYO_COUNT = 64
kryoPool = Channel(KRYO_COUNT)
}
@Synchronized
@ -175,6 +144,17 @@ open class Serialization(private val references: Boolean,
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
SerializationDefaults.register(kryo)
// serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?!
// TODO: this is for diffie hellmen handshake stuff!
// serialization.register(IESParameters::class.java, IesParametersSerializer())
// serialization.register(IESWithCipherParameters::class.java, IesWithCipherParametersSerializer())
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer())
// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer())
// serialization.register(Message::class.java) // must use full package name!
// RMI stuff!
kryo.register(HandshakeMessage::class.java)
kryo.register(GlobalObjectCreateRequest::class.java)
@ -192,8 +172,9 @@ open class Serialization(private val references: Boolean,
kryo.register(Continuation::class.java, continuationSerializer)
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
// note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again
classesToRegister.forEach { registration ->
registration.register(kryo)
registration.register(kryo, rmiIfaceToImpl)
}
if (factory != null) {
@ -349,20 +330,45 @@ open class Serialization(private val references: Boolean,
fun finishInit(endPointClass: Class<*>) {
this.logger = KotlinLogging.logger(endPointClass.simpleName)
// get all classes/fields with @Rmi field annotation.
// The field type must also be added as an RMI type
// val fieldsWithRmiAnnotation = AnnotationDetector.scanClassPath()
// .forAnnotations(Rmi::class.java)
// .on(ElementType.FIELD)
// .collect(AnnotationDefaults.getField)
//
// fieldsWithRmiAnnotation.forEach { field ->
// val fieldType = field.type
// require(fieldType.isInterface) { "RMI annotated fields must be an interface!" }
//
// logger.debug { "Adding additional @Rmi field annotation for RMI registration" }
//
// // now we add this field type as an RMI serializable
//// registerRmi(fieldType, fieldType)
// }
initialized = true
// initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done
// correctly and (if not) we are are notified on the initial thread (instead of on the network update thread)
// save off the class-resolver, so we can lookup the class <-> id relationships
classResolver = globalKryo.classResolver
// now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations based on ID
// in order to get the ID's, these have to be registered with a kryo instance!
val mergedRegistrations = mutableListOf<ClassRegistration>()
classesToRegister.forEach { registration ->
val id = registration.id
// if the id == -1, it means that this registration was ignored! We don't want to include it -- but we want to log
// that something happened.
if (id == -1) {
logger.debug(registration.info)
return@forEach
}
// if we ALREADY contain this registration (based ONLY on ID), then overwrite the existing one and REMOVE the current one
var found = false
mergedRegistrations.forEachIndexed { index, classRegistration ->
@ -396,7 +402,7 @@ open class Serialization(private val references: Boolean,
if (logger.isDebugEnabled) {
// log the in-order output first
classesToRegister.forEach { classRegistration ->
logger.debug(classRegistration.info())
logger.debug(classRegistration.info)
}
}
@ -415,7 +421,7 @@ open class Serialization(private val references: Boolean,
// RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.ifaceClass, classRegistration.implClass, kryoId)
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, classRegistration.implClass, kryoId)
// we ALSO have to cache the instantiator for these, since these are used to create remote objects
val instantiator = globalKryo.instantiatorStrategy.newInstantiatorOf(classRegistration.implClass)
@ -553,7 +559,6 @@ open class Serialization(private val references: Boolean,
val serializerMatches = serializerServer == serializerClient
if (!serializerMatches) {
// JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff explicitly
when {
serializerServer == rmiClientReverseSerializer::class.java.name -> {
// this is for when the impl is on server, and iface is on client
@ -621,17 +626,16 @@ open class Serialization(private val references: Boolean,
}
/**
* Returns the Kryo class registration ID
* Returns the Kryo class registration ID. This is ALWAYS called on the client!
*/
fun getKryoIdForRmi(interfaceClass: Class<*>): Int {
if (!interfaceClass.isInterface) {
throw KryoException("Can only get the kryo IDs for RMI on an interface!")
}
fun getKryoIdForRmiClient(interfaceClass: Class<*>): Int {
require (interfaceClass.isInterface) { "Can only get the kryo IDs for RMI on an interface!" }
val implClass = rmiIfaceToImpl[interfaceClass]
// if we are the RMI-server, we kryo-register the impl
// if we are the RMI-client, we kryo-register the iface (this is us! This method is only called on the rmi-client)
// for RMI, we store the IMPL class in the class registration -- not the iface!
return classResolver.getRegistration(implClass).id
return classResolver.getRegistration(interfaceClass).id
}
/**
@ -787,8 +791,7 @@ open class Serialization(private val references: Boolean,
}
suspend fun <CONNECTION: Connection> updateKryoIdsForRmi(connection: CONNECTION, rmiModificationIds: IntArray, onError: suspend (String) -> Unit) {
val endPoint = connection.endPoint()
val typeName = endPoint.type.simpleName
val typeName = connection.endPoint.type.simpleName
rmiModificationIds.forEach {
if (!existingRmiIds.contains(it)) {