From 9ec8f4efe3bd0dd6f81a71a86b4a6309ba77d097 Mon Sep 17 00:00:00 2001 From: nathan Date: Tue, 1 Sep 2020 09:12:30 +0200 Subject: [PATCH] Better support for sending RMI objects --- src/dorkbox/network/Client.kt | 43 +- src/dorkbox/network/Server.kt | 20 +- src/dorkbox/network/connection/Connection.kt | 21 +- src/dorkbox/network/connection/EndPoint.kt | 4 +- .../network/rmi/RmiManagerConnections.kt | 23 +- src/dorkbox/network/rmi/RmiManagerGlobal.kt | 17 +- src/dorkbox/network/rmi/RmiObjectCache.kt | 29 +- src/dorkbox/network/rmi/TimeoutException.kt | 4 - .../rmi/messages/RmiClientSerializer.kt | 2 + ...seSerializer.kt => RmiServerSerializer.kt} | 54 +- .../serialization/ClassRegistration.kt | 74 ++- .../serialization/ClassRegistration0.kt | 2 +- .../serialization/ClassRegistrationForRmi.kt | 136 +++++ .../network/serialization/RmiHolder.kt | 18 + .../network/serialization/Serialization.kt | 502 ++++++++---------- ...errideAndProxyTest.kt => RmiNestedTest.kt} | 203 ++++++- 16 files changed, 759 insertions(+), 393 deletions(-) rename src/dorkbox/network/rmi/messages/{RmiClientReverseSerializer.kt => RmiServerSerializer.kt} (64%) create mode 100644 src/dorkbox/network/serialization/ClassRegistrationForRmi.kt create mode 100644 src/dorkbox/network/serialization/RmiHolder.kt rename test/dorkboxTest/network/rmi/{RmiOverrideAndProxyTest.kt => RmiNestedTest.kt} (54%) diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 0b31379e..00e5cbe1 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -72,7 +72,7 @@ open class Client(config: Configuration = Configuration private val previousClosedConnectionActivity: Long = 0 - private val rmiConnectionSupport = RmiManagerConnections(logger, listenerManager, rmiGlobalSupport, serialization) + private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization) init { // have to do some basic validation of our configuration @@ -498,8 +498,17 @@ open class Client(config: Configuration = Configuration * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveObject(`object`: Any): Int { - return rmiConnectionSupport.saveImplObject(`object`) + val rmiId = rmiConnectionSupport.saveImplObject(`object`) + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + return rmiId + } + + return rmiId } /** @@ -519,8 +528,15 @@ open class Client(config: Configuration = Configuration * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveObject(`object`: Any, objectId: Int): Boolean { - return rmiConnectionSupport.saveImplObject(`object`, objectId) + val success = rmiConnectionSupport.saveImplObject(`object`, objectId) + if (!success) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + return success } /** @@ -546,7 +562,7 @@ open class Client(config: Configuration = Configuration val kryoId = serialization.getKryoIdForRmiClient(Iface::class.java) @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE") - return rmiConnectionSupport.getRemoteObject(getConnection(), kryoId, objectId, Iface::class.java) + return rmiConnectionSupport.getProxyObject(getConnection(), kryoId, objectId, Iface::class.java) } /** @@ -630,8 +646,15 @@ open class Client(config: Configuration = Configuration * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveGlobalObject(`object`: Any): Int { - return rmiGlobalSupport.saveImplObject(`object`) + val rmiId = rmiGlobalSupport.saveImplObject(`object`) + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + return rmiId } /** @@ -651,8 +674,16 @@ open class Client(config: Configuration = Configuration * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveGlobalObject(`object`: Any, objectId: Int): Boolean { - return rmiGlobalSupport.saveImplObject(`object`, objectId) + val success = rmiGlobalSupport.saveImplObject(`object`, objectId) + if (!success) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + + return success } /** diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 6df17fc4..12c6b70b 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -20,6 +20,7 @@ import dorkbox.netUtil.IPv6 import dorkbox.network.aeron.server.ServerException import dorkbox.network.connection.Connection import dorkbox.network.connection.EndPoint +import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.UdpMediaDriverConnection import dorkbox.network.connection.connectionType.ConnectionProperties import dorkbox.network.connection.connectionType.ConnectionRule @@ -530,8 +531,16 @@ open class Server(config: ServerConfiguration = ServerC * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveGlobalObject(`object`: Any): Int { - return rmiGlobalSupport.saveImplObject(`object`) + val rmiId = rmiGlobalSupport.saveImplObject(`object`) + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + return rmiId + } + return rmiId } /** @@ -551,7 +560,14 @@ open class Server(config: ServerConfiguration = ServerC * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveGlobalObject(`object`: Any, objectId: Int): Boolean { - return rmiGlobalSupport.saveImplObject(`object`, objectId) + val success = rmiGlobalSupport.saveImplObject(`object`, objectId) + if (!success) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + return success } } diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 2d0879ac..8c2bd4af 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -438,8 +438,16 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveObject(`object`: Any): Int { - return rmiConnectionSupport.saveImplObject(`object`) + val rmiId = rmiConnectionSupport.saveImplObject(`object`) + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.value?.notifyError(this, exception) + } + + return rmiId } /** @@ -459,8 +467,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * * @see RemoteObject */ + @Suppress("DuplicatedCode") suspend fun saveObject(`object`: Any, objectId: Int): Boolean { - return rmiConnectionSupport.saveImplObject(`object`, objectId) + val success = rmiConnectionSupport.saveImplObject(`object`, objectId) + if (!success) { + val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.value?.notifyError(this, exception) + } + return success } /** @@ -487,7 +502,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { val kryoId = endPoint.serialization.getKryoIdForRmiClient(Iface::class.java) @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE") - return rmiConnectionSupport.getRemoteObject(this, kryoId, objectId, Iface::class.java) + return rmiConnectionSupport.getProxyObject(this, kryoId, objectId, Iface::class.java) } /** diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index ebf0fd41..b19cf512 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -141,7 +141,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // we only want one instance of these created. These will be called appropriately val settingsStore: SettingsStore - internal val rmiGlobalSupport = RmiManagerGlobal(logger, listenerManager, actionDispatch, config.serialization) + internal val rmiGlobalSupport = RmiManagerGlobal(logger, actionDispatch, config.serialization) init { logger.error("NETWORK STACK IS ONLY IPV4 AT THE MOMENT. IPV6 is in progress!") @@ -337,7 +337,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A * from a "global" context */ internal open fun getRmiConnectionSupport() : RmiManagerConnections { - return RmiManagerConnections(logger, listenerManager, rmiGlobalSupport, serialization) + return RmiManagerConnections(logger, rmiGlobalSupport, serialization) } /** diff --git a/src/dorkbox/network/rmi/RmiManagerConnections.kt b/src/dorkbox/network/rmi/RmiManagerConnections.kt index ff5c02cf..487eb7c6 100644 --- a/src/dorkbox/network/rmi/RmiManagerConnections.kt +++ b/src/dorkbox/network/rmi/RmiManagerConnections.kt @@ -24,11 +24,9 @@ import dorkbox.network.serialization.Serialization import dorkbox.util.collections.LockFreeIntMap import mu.KLogger -internal class RmiManagerConnections( - logger: KLogger, - listenerManager: ListenerManager, - val rmiGlobalSupport: RmiManagerGlobal, - private val serialization: Serialization) : RmiObjectCache(listenerManager, logger) { +internal class RmiManagerConnections(logger: KLogger, + val rmiGlobalSupport: RmiManagerGlobal, + private val serialization: Serialization) : RmiObjectCache(logger) { // It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections! private val proxyObjects = LockFreeIntMap() @@ -51,20 +49,20 @@ internal class RmiManagerConnections( /** * on the connection+client to get a connection-specific remote object (that exists on the server/client) */ - fun getRemoteObject(connection: Connection, kryoId: Int, objectId: Int, interfaceClass: Class): Iface { + fun getProxyObject(connection: Connection, kryoId: Int, rmiId: Int, interfaceClass: Class): Iface { require(interfaceClass.isInterface) { "iface must be an interface." } // so we can just instantly create the proxy object (or get the cached one) - var proxyObject = getProxyObject(objectId) + var proxyObject = getProxyObject(rmiId) if (proxyObject == null) { proxyObject = RmiManagerGlobal.createProxyObject(false, connection, serialization, rmiGlobalSupport.rmiResponseManager, - objectId, kryoId, + rmiId, interfaceClass) - saveProxyObject(objectId, proxyObject) + saveProxyObject(rmiId, proxyObject) } // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! @@ -97,30 +95,29 @@ internal class RmiManagerConnections( val callbackId = RmiUtils.unpackLeft(message.packedIds) val kryoId = RmiUtils.unpackRight(message.packedIds) val objectParameters = message.objectParameters + val serialization = endPoint.serialization // We have to lookup the iface, since the proxy object requires it - val implObject = endPoint.serialization.createRmiObject(kryoId, objectParameters) + val implObject = serialization.createRmiObject(kryoId, objectParameters) val response = if (implObject is Exception) { // whoops! ListenerManager.cleanStackTrace(implObject) endPoint.listenerManager.notifyError(connection, implObject) - // we send the message ANYWAYS, because the client needs to know it did NOT succeed! ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI)) } else { val rmiId = saveImplObject(implObject) - if (rmiId == RemoteObjectStorage.INVALID_RMI) { val exception = NullPointerException("Trying to create an RMI object with the INVALID_RMI id!!") ListenerManager.cleanStackTrace(exception) endPoint.listenerManager.notifyError(connection, exception) } - // we send the message ANYWAYS, because the client needs to know it did NOT succeed! ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId)) } + // we send the message ALWAYS, because the client needs to know it worked or not connection.send(response) } diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index 00bdb903..87d4703e 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -32,11 +32,9 @@ import mu.KLogger import java.lang.reflect.Proxy import java.util.* -internal class RmiManagerGlobal( - logger: KLogger, - listenerManager: ListenerManager, - actionDispatch: CoroutineScope, - internal val serialization: Serialization) : RmiObjectCache(listenerManager, logger) { +internal class RmiManagerGlobal(logger: KLogger, + actionDispatch: CoroutineScope, + internal val serialization: Serialization) : RmiObjectCache(logger) { companion object { /** @@ -208,7 +206,7 @@ internal class RmiManagerGlobal( /** * called on "server" */ - onGlobalObjectCreateRequest(endPoint, connection, message, logger) + onGlobalObjectCreateRequest(endPoint, connection, message) } is GlobalObjectCreateResponse -> { /** @@ -367,14 +365,17 @@ internal class RmiManagerGlobal( /** * called on "server" */ - private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint, connection: CONNECTION, message: GlobalObjectCreateRequest, logger: KLogger) { + private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint, + connection: CONNECTION, + message: GlobalObjectCreateRequest) { val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) val objectParameters = message.objectParameters + val serialization = endPoint.serialization // We have to lookup the iface, since the proxy object requires it - val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters) + val implObject = serialization.createRmiObject(interfaceClassId, objectParameters) val response = if (implObject is Exception) { // whoops! diff --git a/src/dorkbox/network/rmi/RmiObjectCache.kt b/src/dorkbox/network/rmi/RmiObjectCache.kt index cfebba61..a1fb273b 100644 --- a/src/dorkbox/network/rmi/RmiObjectCache.kt +++ b/src/dorkbox/network/rmi/RmiObjectCache.kt @@ -15,8 +15,6 @@ */ package dorkbox.network.rmi -import dorkbox.network.connection.Connection -import dorkbox.network.connection.ListenerManager import mu.KLogger /** @@ -25,39 +23,22 @@ import mu.KLogger * The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there * would be conflicts in the data structure) */ -internal open class RmiObjectCache(private val listenerManager: ListenerManager, logger: KLogger) { +internal open class RmiObjectCache(logger: KLogger) { private val implObjects = RemoteObjectStorage(logger) - /** * @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted) */ - suspend fun saveImplObject(rmiObject: Any): Int { - val rmiId = implObjects.register(rmiObject) - - if (rmiId == RemoteObjectStorage.INVALID_RMI) { - val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated") - ListenerManager.cleanStackTrace(exception) - listenerManager.notifyError(exception) - } - - return rmiId + fun saveImplObject(rmiObject: Any): Int { + return implObjects.register(rmiObject) } /** * @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted) */ - suspend fun saveImplObject(rmiObject: Any, objectId: Int): Boolean { - val success = implObjects.register(rmiObject, objectId) - - if (!success) { - val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated") - ListenerManager.cleanStackTrace(exception) - listenerManager.notifyError(exception) - } - - return success + fun saveImplObject(rmiObject: Any, objectId: Int): Boolean { + return implObjects.register(rmiObject, objectId) } fun getImplObject(rmiId: Int): T? { diff --git a/src/dorkbox/network/rmi/TimeoutException.kt b/src/dorkbox/network/rmi/TimeoutException.kt index 67768ebd..bf6b0faf 100644 --- a/src/dorkbox/network/rmi/TimeoutException.kt +++ b/src/dorkbox/network/rmi/TimeoutException.kt @@ -49,8 +49,4 @@ class TimeoutException : IOException { constructor(message: String?, cause: Throwable?) : super(message, cause) {} constructor(message: String?) : super(message) {} constructor(cause: Throwable?) : super(cause) {} - - companion object { - private const val serialVersionUID = -3526277240277423682L - } } diff --git a/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt b/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt index 5821e959..eeca2f6e 100644 --- a/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt @@ -47,6 +47,8 @@ import java.lang.reflect.Proxy * rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server) * rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client) * + * rmi-server MUST registerRmi both the iface+impl + * * During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer. * If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID */ diff --git a/src/dorkbox/network/rmi/messages/RmiClientReverseSerializer.kt b/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt similarity index 64% rename from src/dorkbox/network/rmi/messages/RmiClientReverseSerializer.kt rename to src/dorkbox/network/rmi/messages/RmiServerSerializer.kt index 44fe9b65..a9723a75 100644 --- a/src/dorkbox/network/rmi/messages/RmiClientReverseSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt @@ -40,6 +40,7 @@ import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import dorkbox.network.rmi.RemoteObjectStorage import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.Serialization.Companion.INVALID_KRYO_ID /** * This is to manage serializing RMI objects across the wire... @@ -65,33 +66,60 @@ import dorkbox.network.serialization.KryoExtra * rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server) * rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client) * + * rmi-server MUST registerRmi both the iface+impl + * * During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer. * If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID */ -class RmiClientReverseSerializer : Serializer(false) { +class RmiServerSerializer : Serializer(false) { override fun write(kryo: Kryo, output: Output, `object`: Any) { val kryoExtra = kryo as KryoExtra - val rmiConnectionSupport = kryoExtra.connection.rmiConnectionSupport - // have to write what the rmi ID is ONLY. We have to find out if it's a global object or connection scope object! + val connection = kryoExtra.connection + val rmiConnectionSupport = connection.rmiConnectionSupport - // check connection scope first - var id = rmiConnectionSupport.getId(`object`) + // have to write what the rmi ID is ONLY. A remote object sent via a connection IS ONLY a connection-scope object! - // check global scope second - if (id == RemoteObjectStorage.INVALID_RMI) { - id = rmiConnectionSupport.rmiGlobalSupport.getId(`object`) + // check if we have saved it already + var rmiId = rmiConnectionSupport.getId(`object`) + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + // this means we have to save it. This object is cached so we can have an association between rmiID <-> object + rmiId = rmiConnectionSupport.saveImplObject(`object`) + + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + connection.logger.error("Unable to save $`object` for use as RMI!") + } } - output.writeInt(id, true) + output.writeInt(rmiId, true) } - override fun read(kryo: Kryo, input: Input, iface: Class<*>): Any? { + override fun read(kryo: Kryo, input: Input, interfaceClass: Class<*>): Any? { val kryoExtra = kryo as KryoExtra - val objectID = input.readInt(true) + val rmiId = input.readInt(true) val connection = kryoExtra.connection - val kryoId = connection.endPoint.serialization.getKryoIdForRmiClient(iface) - return connection.rmiConnectionSupport.getRemoteObject(connection, kryoId, objectID, iface) + val serialization = connection.endPoint.serialization + + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + throw NullPointerException("RMI ID is invalid. Unable to use proxy object!") + } + + // the rmi-server will have iface+impl id's + // the rmi-client will have iface id's + + return if (interfaceClass.isInterface) { + // normal case. RMI only on 1 side + val kryoId = serialization.rmiHolder.ifaceToId[interfaceClass]!! + require(kryoId != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" } + connection.rmiConnectionSupport.getProxyObject(connection, kryoId, rmiId, interfaceClass) + } else { + // BI-DIRECTIONAL RMI -- THIS IS NOT NORMAL! + // this won't be an interface. It will be an impl (because of how RMI is setup) + val kryoId = serialization.rmiHolder.implToId[interfaceClass]!! + require(kryoId != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" } + val iface = serialization.rmiHolder.idToIface[kryoId] + connection.rmiConnectionSupport.getProxyObject(connection, kryoId, rmiId, iface) + } } } diff --git a/src/dorkbox/network/serialization/ClassRegistration.kt b/src/dorkbox/network/serialization/ClassRegistration.kt index 78e48bb0..dfc0a746 100644 --- a/src/dorkbox/network/serialization/ClassRegistration.kt +++ b/src/dorkbox/network/serialization/ClassRegistration.kt @@ -16,39 +16,61 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Serializer -import dorkbox.network.rmi.messages.RmiClientReverseSerializer -import dorkbox.util.collections.IdentityMap +import dorkbox.network.rmi.messages.RmiServerSerializer internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) { + companion object { + const val IGNORE_REGISTRATION = -1 + } + var info: String = "" - fun register(kryo: KryoExtra, rmiIfaceToImpl: IdentityMap, Class<*>>) { - // we have to check if this registration ALREADY exists for RMI. If so, we ignore it. - // RMI kryo-registration is SPECIFICALLY for impl object ONLY DURING INITIAL REGISTRATION! - // if the registration is modified, then the registration will be the iface - if (clazz.isInterface) { - val impl = rmiIfaceToImpl[clazz] - if (impl != null && kryo.classResolver.getRegistration(impl)?.serializer is RmiClientReverseSerializer) { - // do nothing, because this is already registered for RMI - info = "Removed RMI conflict registration for class ${clazz.name}" - id = -1 - return - } + /** + * we have to check if this registration ALREADY exists for RMI. + * + * If so, we ignore it - any IFACE or IMPL that already has been assigned to an RMI serializer, *MUST* remain an RMI serializer + * If this class registration will EVENTUALLY be for RMI, then [ClassRegistrationForRmi] will reassign the serializer + */ + open fun register(kryo: KryoExtra, rmi: RmiHolder) { + val savedKryoId: Int? = rmi.ifaceToId[clazz] - } else { - if (kryo.classResolver.getRegistration(clazz)?.serializer is RmiClientReverseSerializer) { - // do nothing, because this is already registered for RMI - info = "Removed RMI conflict registration for class ${clazz.name}" - id = -1 - return - } - } + var overriddenSerializer: Serializer? = null - // otherwise, we are OK to continue to register this - register(kryo) - } + // did we already process this class? We permit overwriting serializers, etc! + if (savedKryoId != null) { + overriddenSerializer = kryo.classResolver.getRegistration(savedKryoId)?.serializer + when (overriddenSerializer) { + is RmiServerSerializer -> { + // do nothing, because this is ALREADY registered for RMI + info = if (serializer == null) { + "CONFLICTED $savedKryoId -> (RMI) Ignored duplicate registration for ${clazz.name}" + } else { + "CONFLICTED $savedKryoId -> (RMI) Ignored duplicate registration for ${clazz.name} (${serializer.javaClass.name})" + } + // mark this for later, so we don't try to do something with it + id = IGNORE_REGISTRATION + return + } + else -> { + // mark that this was overridden! + } + } + } + + // otherwise, we are OK to continue to register this + register(kryo) + + if (overriddenSerializer != null) { + info = "$info (Replaced $overriddenSerializer)" + } + + // now, we want to save the relationship between classes and kryoId + rmi.idToIface[id] = clazz + rmi.ifaceToId[clazz] = id + } + + open fun register(kryo: KryoExtra) {} - abstract fun register(kryo: KryoExtra) abstract fun getInfoArray(): Array } diff --git a/src/dorkbox/network/serialization/ClassRegistration0.kt b/src/dorkbox/network/serialization/ClassRegistration0.kt index a7ed914d..ec1cb1ac 100644 --- a/src/dorkbox/network/serialization/ClassRegistration0.kt +++ b/src/dorkbox/network/serialization/ClassRegistration0.kt @@ -20,7 +20,7 @@ import com.esotericsoftware.kryo.Serializer internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) { override fun register(kryo: KryoExtra) { id = kryo.register(clazz, serializer).id - info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass?.name}" + info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass.name}" } override fun getInfoArray(): Array { diff --git a/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt b/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt new file mode 100644 index 00000000..8e2b894d --- /dev/null +++ b/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt @@ -0,0 +1,136 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.serialization + +import dorkbox.network.rmi.messages.RmiServerSerializer + +/** + * This is to manage serializing RMI objects across the wire... + * + * NOTE: + * CLIENT: can never send the iface object, if it's RMI, it will send the java Proxy object instead. + * SERVER: can never send the iface object, it will always send the IMPL object instead (because of how kryo works) + * + * ************************** + * NOTE: This works because we TRICK kryo serialization by changing what the kryo ID serializer is on each end of the connection + * ************************** + * + * What we do is on the server, REWRITE the kryo ID for the impl so that it will send just the rmi ID instead of the object + * on the client, this SAME kryo ID must have this serializer as well, so the proxy object is re-assembled. + * + * Kryo serialization works by inspecting the field VALUE type, not the field DEFINED type... So if you send an actual object, you must + * register specifically for the implementation object. + * + * + * To recap: + * rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server) + * rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client) + * + * rmi-server MUST registerRmi both the iface+impl + * + * During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer. + * If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID + */ +internal class ClassRegistrationForRmi(ifaceClass: Class<*>, + val implClass: Class<*>, + serializer: RmiServerSerializer) : ClassRegistration(ifaceClass, serializer) { + /** + * In general: + * + * ALL kryo registrations must be for IMPL, because of how kryo works, kryo can ONLY send IMPL objects, and thus serialization + * can only be for IMPL objects. (we can write fields for an IFACE, but when constructing an object, it must be concrete) + * + * To recap how RMI works: + * rmi-client works ONLY with proxy objects + * rmi-server works ONLY with impl objects + * **NOTE: it is possible to have both ends of a connection be BOTH the rmi-client+server (for bi-directional RMI) + * + * + * #### + * To SEND/RECEIVE an RMI object + * + * for rmi-client -> {object} -> rmi-server + * REQUIRES: + * rmi-client: (java proxy object -> bytes) + * register InvocationHandler class with RmiClientSerializer + * register IFACE class + * rmi-server: (bytes -> java impl object) + * register InvocationHandler class with RmiClientSerializer + * register IMPL class + * able to lookup rmiID -> IMPL object + * + * rmi-client -> send proxy object -> {{kryo: InvocationHandler, sends rmiID}} + * {{network}} + * {{kryo: InvocationHandler looks up impl object based on ID}} -> rmi-server receives IMPL object + * + * + * for rmi-server -> {object} -> rmi-client + * REQUIRES: + * rmi-server: (java impl object -> bytes) + * register IMPL object class with RmiServerSerializer + * able to lookup IMPL object -> rmiID + * rmi-client: (bytes -> java proxy object) + * register IFACE class with RmiServerSerializer + * able to lookup/create rmiID -> proxy object + * + * rmi-server -> send impl object -> {{kryo: RmiServerSerializer, sends rmiId}} + * {{network}} + * {{kryo: RmiServerSerializer read rmiID, looks up/creates proxy object using IFACE}} -> rmi-client receives proxy object + * + * + * Requirements for all cases of RMI + * rmi-client: + * send: register InvocationHandler class with RmiClientSerializer + * receive: register IMPL object class with RmiServerSerializer + * lookup IMPL object -> rmiID + * + * rmi-server: + * receive: register InvocationHandler class with RmiClientSerializer + * lookup rmiID -> IMPL object + * send: register IMPL object class with RmiServerSerializer + * lookup IMPL object -> rmiID + */ + override fun register(kryo: KryoExtra, rmi: RmiHolder) { + // we override this, because we ALWAYS will call our RMI registration! + + // have to get the ID for the interface (if it exists) + val registration = kryo.classResolver.getRegistration(clazz) // this is ifaceClass, and must match what is defined on the rmi client + if (registration != null) { + id = registration.id + + // override that registration + kryo.register(implClass, serializer, id) + } else { + // now register the impl class + id = kryo.register(implClass, serializer).id + } + info = "Registered $id -> (RMI) ${implClass.name}" + + + // now, we want to save the relationship between classes and kryoId + rmi.ifaceToId[clazz] = id + rmi.idToIface[id] = clazz + + // we have to know what the IMPL class is so we can create it for a "createObject" RMI command + rmi.implToId[implClass] = id + rmi.idToImpl[id] = implClass + } + + override fun getInfoArray(): Array { + // the info array has to match for the INTERFACE (not the impl!) + return arrayOf(id, clazz.name, serializer!!::class.java.name) + } +} diff --git a/src/dorkbox/network/serialization/RmiHolder.kt b/src/dorkbox/network/serialization/RmiHolder.kt new file mode 100644 index 00000000..6d2dcdf3 --- /dev/null +++ b/src/dorkbox/network/serialization/RmiHolder.kt @@ -0,0 +1,18 @@ +package dorkbox.network.serialization + +import org.agrona.collections.Int2ObjectHashMap +import org.agrona.collections.Object2IntHashMap +import org.objenesis.instantiator.ObjectInstantiator + +/// RMI things +class RmiHolder { + val idToInstantiator : Int2ObjectHashMap> = Int2ObjectHashMap() + + // note: ONLY RegisterRmi (iface+impl) will have their impl class info saved! + + val ifaceToId = Object2IntHashMap>(Serialization.INVALID_KRYO_ID) + val implToId = Object2IntHashMap>(Serialization.INVALID_KRYO_ID) + + val idToImpl = Int2ObjectHashMap>() + val idToIface = Int2ObjectHashMap>() +} diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 1b4ce157..8ec96aae 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -15,7 +15,7 @@ */ package dorkbox.network.serialization -import com.esotericsoftware.kryo.ClassResolver +import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.SerializerFactory @@ -35,13 +35,12 @@ import dorkbox.network.rmi.messages.MethodRequest import dorkbox.network.rmi.messages.MethodRequestSerializer import dorkbox.network.rmi.messages.MethodResponse import dorkbox.network.rmi.messages.MethodResponseSerializer -import dorkbox.network.rmi.messages.RmiClientReverseSerializer import dorkbox.network.rmi.messages.RmiClientSerializer +import dorkbox.network.rmi.messages.RmiServerSerializer import dorkbox.os.OS -import dorkbox.util.collections.IdentityMap import dorkbox.util.serialization.SerializationDefaults import dorkbox.util.serialization.SerializationManager -import kotlinx.coroutines.channels.Channel +import kotlinx.atomicfu.atomic import kotlinx.coroutines.runBlocking import mu.KLogger import mu.KotlinLogging @@ -56,6 +55,7 @@ import java.lang.reflect.InvocationHandler import java.util.concurrent.CopyOnWriteArrayList import kotlin.coroutines.Continuation + /** * Threads reading/writing at the same time a single instance of kryo. it is possible to use a single kryo with the use of * synchronize, however - that defeats the point of having multi-threaded serialization. @@ -76,14 +76,17 @@ import kotlin.coroutines.Continuation * an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see * Kryo#newDefaultSerializer(Class) */ -open class Serialization(private val references: Boolean = true, - private val factory: SerializerFactory<*>? = null) : SerializationManager { +open class Serialization(private val references: Boolean = true, private val factory: SerializerFactory<*>? = null) : SerializationManager { + + companion object { + // -2 is the same value that kryo uses for invalid id's + const val INVALID_KRYO_ID = -2 + } private lateinit var logger: KLogger - private var initialized = false - private val kryoPool: Channel - lateinit var classResolver: ClassResolver + private var initialized = atomic(false) + private val kryoPool = MultithreadConcurrentQueue(1024) // reasonable size of available kryo's // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. @@ -92,12 +95,6 @@ open class Serialization(private val references: Boolean = true, private lateinit var savedKryoIdsForRmi: IntArray private lateinit var savedRegistrationDetails: ByteArray - /// RMI things - private val rmiIfaceToInstantiator : Int2ObjectHashMap> = Int2ObjectHashMap() - private val rmiIfaceToImpl = IdentityMap, Class<*>>() - private val rmiImplToIface = IdentityMap, Class<*>>() - - // This is a GLOBAL, single threaded only kryo instance. // This is to make sure that we have an instance of class registration done correctly and (if not) we are // are notified on the initial thread (instead of on the network update thread) @@ -109,32 +106,161 @@ open class Serialization(private val references: Boolean = true, private val methodRequestSerializer = MethodRequestSerializer() private val methodResponseSerializer = MethodResponseSerializer() - private val continuationSerializer = ContinuationSerializer() private val rmiClientSerializer = RmiClientSerializer() - private val rmiClientReverseSerializer = RmiClientReverseSerializer() + private val rmiServerSerializer = RmiServerSerializer() + + val rmiHolder = RmiHolder() // list of already seen client RMI ids (which the server might not have registered as RMI types). private var existingRmiIds = CopyOnWriteArrayList() - - // the purpose of the method cache, is to accelerate looking up methods for specific class private val methodCache : Int2ObjectHashMap> = Int2ObjectHashMap() - // reflectASM doesn't work on android private val useAsm = !OS.isAndroid() - init { - // reasonable size of available kryo's before coroutines are suspended during read/write - val KRYO_COUNT = 64 + /** + * Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer]. + * If the class is already registered, the existing entry is updated with the new serializer. + * + * + * Registering a primitive also affects the corresponding primitive wrapper. + * + * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this + * method. + * + * The order must be the same at deserialization as it was for serialization. + * + * This must happen before the creation of the client/server + */ + override fun register(clazz: Class): Serialization { + require(!initialized.value) { "Serialization 'register(class)' cannot happen after client/server initialization!" } - kryoPool = Channel(KRYO_COUNT) +// // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather +// // with object types... EVEN IF THERE IS A SERIALIZER +// require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." } + + classesToRegister.add(ClassRegistration3(clazz)) + return this } + /** + * Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID + * is already in use by a different type, an exception is thrown. + * + * + * Registering a primitive also affects the corresponding primitive wrapper. + * + * IDs must be the same at deserialization as they were for serialization. + * + * This must happen before the creation of the client/server + * + * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but + * these IDs can be repurposed. + */ + override fun register(clazz: Class, id: Int): Serialization { + require(!initialized.value) { "Serialization 'register(Class, int)' cannot happen after client/server initialization!" } + + // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather + // with object types... EVEN IF THERE IS A SERIALIZER + require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." } + + classesToRegister.add(ClassRegistration1(clazz, id)) + return this + } + + /** + * Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered, + * the existing entry is updated with the new serializer. + * + * + * Registering a primitive also affects the corresponding primitive wrapper. + * + * + * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this + * method. The order must be the same at deserialization as it was for serialization. + */ @Synchronized + override fun register(clazz: Class, serializer: Serializer): Serialization { + require(!initialized.value) { "Serialization 'register(Class, Serializer)' cannot happen after client/server initialization!" } + + // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather + // with object types... EVEN IF THERE IS A SERIALIZER + require(!clazz.isInterface) { "Cannot register '${clazz.name}' with a serializer. It must be an implementation." } + + classesToRegister.add(ClassRegistration0(clazz, serializer)) + return this + } + + /** + * Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is + * overwritten. If the ID is already in use by a different type, an exception is thrown. + * + * + * Registering a primitive also affects the corresponding primitive wrapper. + * + * + * IDs must be the same at deserialization as they were for serialization. + * + * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but + * these IDs can be repurposed. + */ + @Synchronized + override fun register(clazz: Class, serializer: Serializer, id: Int): Serialization { + require(!initialized.value) { "Serialization 'register(Class, Serializer, int)' cannot happen after client/server initialization!" } + + // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather + // with object types... EVEN IF THERE IS A SERIALIZER + require(!clazz.isInterface) { "Cannot register '${clazz.name}'. It must be an implementation." } + + classesToRegister.add(ClassRegistration2(clazz, serializer, id)) + return this + } + + /** + * There is additional overhead to using RMI. + * + * This enables a "remote endpoint" to access methods and create objects (RMI) for this endpoint. + * + * This is NOT bi-directional, and this endpoint cannot access or create remote objects on the "remote client". + * + * @throws IllegalArgumentException if the iface/impl have previously been overridden + */ + @Synchronized + fun registerRmi(ifaceClass: Class, implClass: Class): Serialization { + require(!initialized.value) { "Serialization 'registerRmi(Class, Class)' cannot happen after client/server initialization!" } + + require(ifaceClass.isInterface) { "Cannot register an implementation for RMI access. It must be an interface." } + require(!implClass.isInterface) { "Cannot register an interface for RMI implementations. It must be an implementation." } + + classesToRegister.add(ClassRegistrationForRmi(ifaceClass, implClass, rmiServerSerializer)) + return this + } + + /** + * NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client + * (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the + * server, thus preventing them from trying to probe the server data structures. + * + * @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo + */ + fun getKryoRegistrationDetails(): ByteArray { + return savedRegistrationDetails + } + + /** + * @return the details of all registration IDs for RMI iface serializer rewrites + */ + fun getKryoRmiIds(): IntArray { + return savedKryoIdsForRmi + } + + /** + * called as the first think inside [finishInit] + */ private fun initKryo(): KryoExtra { val kryo = KryoExtra(methodCache) @@ -174,7 +300,7 @@ open class Serialization(private val references: Boolean = true, // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) // note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again classesToRegister.forEach { registration -> - registration.register(kryo, rmiIfaceToImpl) + registration.register(kryo, rmiHolder) } if (factory != null) { @@ -184,177 +310,20 @@ open class Serialization(private val references: Boolean = true, return kryo } - - - /** - * Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer]. - * If the class is already registered, the existing entry is updated with the new serializer. - * - * - * Registering a primitive also affects the corresponding primitive wrapper. - * - * - * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this - * method. The order must be the same at deserialization as it was for serialization. - */ - @Synchronized - override fun register(clazz: Class): Serialization { - if (initialized) { - logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class) call.") - } else { - classesToRegister.add(ClassRegistration3(clazz)) - } - - return this - } - - /** - * Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID - * is already in use by a different type, an exception is thrown. - * - * - * Registering a primitive also affects the corresponding primitive wrapper. - * - * IDs must be the same at deserialization as they were for serialization. - * - * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but - * these IDs can be repurposed. - */ - @Synchronized - override fun register(clazz: Class, id: Int): Serialization { - if (initialized) { - logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, int) call.") - return this - } - - // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather - // with object types... EVEN IF THERE IS A SERIALIZER - require(!clazz.isInterface) { "Cannot register an interface '${clazz}' with specified ID for serialization. It must be an implementation." } - - classesToRegister.add(ClassRegistration1(clazz, id)) - return this - } - - /** - * Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered, - * the existing entry is updated with the new serializer. - * - * - * Registering a primitive also affects the corresponding primitive wrapper. - * - * - * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this - * method. The order must be the same at deserialization as it was for serialization. - */ - @Synchronized - override fun register(clazz: Class, serializer: Serializer): Serialization { - if (initialized) { - logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer) call.") - return this - } - - // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather - // with object types... EVEN IF THERE IS A SERIALIZER - require(!clazz.isInterface) { "Cannot register an interface '${clazz.name}' with a serializer. It must be an implementation." } - - classesToRegister.add(ClassRegistration0(clazz, serializer)) - return this - } - - /** - * Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is - * overwritten. If the ID is already in use by a different type, an exception is thrown. - * - * - * Registering a primitive also affects the corresponding primitive wrapper. - * - * - * IDs must be the same at deserialization as they were for serialization. - * - * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but - * these IDs can be repurposed. - */ - @Synchronized - override fun register(clazz: Class, serializer: Serializer, id: Int): Serialization { - if (initialized) { - logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer, int) call.") - return this - } - - // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather - // with object types... EVEN IF THERE IS A SERIALIZER - require(!clazz.isInterface) { "Cannot register an interface '${clazz.name}'. It must be an implementation." } - - classesToRegister.add(ClassRegistration2(clazz, serializer, id)) - return this - } - - /** - * There is additional overhead to using RMI. - * - * This enables a "remote endpoint" to access methods and create objects (RMI) for this endpoint. - * - * This is NOT bi-directional, and this endpoint cannot access or create remote objects on the "remote client". - * - * @throws IllegalArgumentException if the iface/impl have previously been overridden - */ - @Synchronized - fun registerRmi(ifaceClass: Class, implClass: Class): Serialization { - if (initialized) { - logger.warn("Serialization manager already initialized. Ignoring duplicate registerRmiImplementation(Class, Class) call.") - return this - } - - require(ifaceClass.isInterface) { "Cannot register an implementation for RMI access. It must be an interface." } - require(!implClass.isInterface) { "Cannot register an interface for RMI implementations. It must be an implementation." } - - classesToRegister.add(ClassRegistrationIfaceAndImpl(ifaceClass, implClass, rmiClientReverseSerializer)) - - // rmiIfaceToImpl tells us, "the server" how to create a (requested) remote object - // this MUST BE UNIQUE otherwise unexpected and BAD things can happen. - val a = rmiIfaceToImpl.put(ifaceClass, implClass) - val b = rmiImplToIface.put(implClass, ifaceClass) - - require(!(a != null || b != null)) { - "Unable to override interface ($ifaceClass) and implementation ($implClass) " + - "because they have already been overridden by something else. It is critical that they are both unique per JVM" - } - return this - } - /** * Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration. If an ID * is already in use by a different type, an exception is thrown. */ - @Synchronized fun finishInit(endPointClass: Class<*>) { - this.logger = KotlinLogging.logger(endPointClass.simpleName) - - - // get all classes/fields with @Rmi field annotation. - // The field type must also be added as an RMI type -// val fieldsWithRmiAnnotation = AnnotationDetector.scanClassPath() -// .forAnnotations(Rmi::class.java) -// .on(ElementType.FIELD) -// .collect(AnnotationDefaults.getField) -// -// fieldsWithRmiAnnotation.forEach { field -> -// val fieldType = field.type -// require(fieldType.isInterface) { "RMI annotated fields must be an interface!" } -// -// logger.debug { "Adding additional @Rmi field annotation for RMI registration" } -// -// // now we add this field type as an RMI serializable -//// registerRmi(fieldType, fieldType) -// } - - - initialized = true - - // save off the class-resolver, so we can lookup the class <-> id relationships - classResolver = globalKryo.classResolver + logger = KotlinLogging.logger(endPointClass.simpleName) + if (!initialized.compareAndSet(expect = false, update = true)) { + logger.error("Unable to initialize serialization more than once!") + return + } + // this will set up the class registration information + initKryo() // now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations based on ID // in order to get the ID's, these have to be registered with a kryo instance! @@ -362,10 +331,11 @@ open class Serialization(private val references: Boolean = true, classesToRegister.forEach { registration -> val id = registration.id - // if the id == -1, it means that this registration was ignored! We don't want to include it -- but we want to log - // that something happened. - if (id == -1) { - logger.debug(registration.info) + // if the id == -1, it means that this registration was ignored! + // + // We don't want to include it -- but we want to log that something happened (the info has been customized) + if (id == ClassRegistration.IGNORE_REGISTRATION) { + logger.warn(registration.info) return@forEach } @@ -416,18 +386,20 @@ open class Serialization(private val references: Boolean = true, // so it is super trivial to map out all possible, relevant types val kryoId = classRegistration.id - if (classRegistration is ClassRegistrationIfaceAndImpl) { + if (classRegistration is ClassRegistrationForRmi) { // on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation! + val implClass = classRegistration.implClass + // RMI method caching methodCache[kryoId] = - RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, classRegistration.implClass, kryoId) + RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, implClass, kryoId) // we ALSO have to cache the instantiator for these, since these are used to create remote objects - val instantiator = globalKryo.instantiatorStrategy.newInstantiatorOf(classRegistration.implClass) - @Suppress("UNCHECKED_CAST") - rmiIfaceToInstantiator[kryoId] = instantiator as ObjectInstantiator + rmiHolder.idToInstantiator[kryoId] = + globalKryo.instantiatorStrategy.newInstantiatorOf(implClass) as ObjectInstantiator + // finally, we must save this ID, to tell the remote connection that their interface serializer must change to support // receiving an RMI impl object as a proxy object @@ -452,7 +424,6 @@ open class Serialization(private val references: Boolean = true, // save this as a byte array (so class registration validation during connection handshake is faster) val output = AeronOutput() - try { globalKryo.writeCompressed(logger, output, registrationDetails.toTypedArray()) } catch (e: Exception) { @@ -465,23 +436,6 @@ open class Serialization(private val references: Boolean = true, output.close() } - /** - * NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client - * (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the - * server, thus preventing them from trying to probe the server data structures. - * - * @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo - */ - fun getKryoRegistrationDetails(): ByteArray { - return savedRegistrationDetails - } - - /** - * @return the details of all registration IDs for RMI iface serializer rewrites - */ - fun getKryoRmiIds(): IntArray { - return savedKryoIdsForRmi - } /** * NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client @@ -560,14 +514,14 @@ open class Serialization(private val references: Boolean = true, if (!serializerMatches) { // JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff explicitly when { - serializerServer == rmiClientReverseSerializer::class.java.name -> { - // this is for when the impl is on server, and iface is on client + serializerServer == rmiServerSerializer::class.java.name -> { + // this is for when the rmi-server is on the server, and the rmi-client is on client // after this check, we tell the client that this ID is for RMI // This necessary because only 1 side registers RMI iface/impl info } - serializerClient == rmiClientReverseSerializer::class.java.name -> { - // this is for when the impl is on client, and iface is on server + serializerClient == rmiServerSerializer::class.java.name -> { + // this is for when the rmi-server is on client, and the rmi-client is on server // after this check, we tell MYSELF (the server) that this id is for RMI // This necessary because only 1 side registers RMI iface/impl info @@ -605,10 +559,50 @@ open class Serialization(private val references: Boolean = true, input.close() } - return false } + + /** + * Called when the kryo IDs are updated to be the RMI reverse serializer. + * + * NOTE: the IFACE must already be registered!! + */ + suspend fun updateKryoIdsForRmi(connection: CONNECTION, + rmiModificationIds: IntArray, + onError: suspend (String) -> Unit) { + val typeName = connection.endPoint.type.simpleName + + // store all of the classes + kryo registration IDs + + rmiModificationIds.forEach { + if (!existingRmiIds.contains(it)) { + existingRmiIds.add(it) + + + // have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on + // a single thread. + // NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct + // because they are set on initialization + + val registration = globalKryo.getRegistration(it) + val regMessage = "$typeName-side RMI serializer for registration $it -> ${registration.type}" + + if (registration.type.isInterface) { + logger.debug { "Modifying $regMessage" } + + // RMI must be with an interface. If it's not an interface then something is wrong + registration.serializer = rmiServerSerializer + } else { + // note: one way that this can be called is when BOTH the client + server register the same way for RMI IDs. When + // the endpoint serialization is initialized, we also add the RMI IDs to this list, so we don't have to worry about this specific + // scenario + onError("Ignoring unsafe modification of $regMessage") + } + } + } + } + /** * @return takes a kryo instance from the pool, or creates one if the pool was empty */ @@ -631,11 +625,14 @@ open class Serialization(private val references: Boolean = true, fun getKryoIdForRmiClient(interfaceClass: Class<*>): Int { require (interfaceClass.isInterface) { "Can only get the kryo IDs for RMI on an interface!" } - // if we are the RMI-server, we kryo-register the impl - // if we are the RMI-client, we kryo-register the iface (this is us! This method is only called on the rmi-client) + // BI-DIRECTIONAL RMI -- WILL NOT CALL THIS METHOD! - // for RMI, we store the IMPL class in the class registration -- not the iface! - return classResolver.getRegistration(interfaceClass).id + // the rmi-server will have iface+impl id's + // the rmi-client will have iface id's + + val id = rmiHolder.ifaceToId[interfaceClass]!! + require(id != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" } + return id } /** @@ -645,17 +642,19 @@ open class Serialization(private val references: Boolean = true, */ fun createRmiObject(interfaceClassId: Int, objectParameters: Array?): Any { try { - if (objectParameters == null) { - return rmiIfaceToInstantiator[interfaceClassId].newInstance() + if (objectParameters.isNullOrEmpty()) { + // simple, easy, fast. + return rmiHolder.idToInstantiator[interfaceClassId].newInstance() } - val size = objectParameters.size - // we have to get the constructor for this object. - val clazz = classResolver.getRegistration(interfaceClassId).type - val constructors = clazz.declaredConstructors + val clazz: Class<*> = rmiHolder.idToImpl[interfaceClassId] ?: + return IllegalArgumentException("Cannot create RMI object for kryo interfaceClassId: $interfaceClassId (no class exists)") + // now have to find the closest match. + val constructors = clazz.declaredConstructors + val size = objectParameters.size val matchedBySize = constructors.filter { it.parameterCount == size } if (matchedBySize.size == 1) { @@ -681,7 +680,7 @@ open class Serialization(private val references: Boolean = true, // find the constructor with the highest match matchedByType.sortByDescending { it.first } return matchedByType[0].second.newInstance(*objectParameters) - } catch(e: Exception) { + } catch (e: Exception) { return e } } @@ -693,15 +692,6 @@ open class Serialization(private val references: Boolean = true, return methodCache[classId] } - /** - * @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client - * and server, even if in the same JVM. This only attempts to register once. - */ - @Synchronized - fun initialized(): Boolean { - return initialized - } - /** * # BLOCKING * @@ -790,36 +780,6 @@ open class Serialization(private val references: Boolean = true, } } - suspend fun updateKryoIdsForRmi(connection: CONNECTION, rmiModificationIds: IntArray, onError: suspend (String) -> Unit) { - val typeName = connection.endPoint.type.simpleName - - rmiModificationIds.forEach { - if (!existingRmiIds.contains(it)) { - existingRmiIds.add(it) - - // have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on - // a single thread. - // NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct - - val registration = globalKryo.getRegistration(it) - val regMessage = "$typeName-side RMI serializer for registration $it -> ${registration.type}" - - if (registration.type.isInterface) { - logger.debug { - "Modifying $regMessage" - } - // RMI must be with an interface. If it's not an interface then something is wrong - registration.serializer = rmiClientReverseSerializer - } else { - // note: one way that this can be called is when BOTH the client + server register the same way for RMI IDs. When - // the endpoint serialization is initialized, we also add the RMI IDs to this list, so we don't have to worry about this specific - // scenario - onError("Attempting an unsafe modification of $regMessage") - } - } - } - } - // NOTE: These following functions are ONLY called on a single thread! fun readMessage(buffer: DirectBuffer, offset: Int, length: Int): Any? { return globalKryo.read(buffer, offset, length) diff --git a/test/dorkboxTest/network/rmi/RmiOverrideAndProxyTest.kt b/test/dorkboxTest/network/rmi/RmiNestedTest.kt similarity index 54% rename from test/dorkboxTest/network/rmi/RmiOverrideAndProxyTest.kt rename to test/dorkboxTest/network/rmi/RmiNestedTest.kt index b12a4186..1187870f 100644 --- a/test/dorkboxTest/network/rmi/RmiOverrideAndProxyTest.kt +++ b/test/dorkboxTest/network/rmi/RmiNestedTest.kt @@ -16,10 +16,8 @@ package dorkboxTest.network.rmi import dorkbox.network.Client -import dorkbox.network.Configuration import dorkbox.network.Server import dorkbox.network.connection.Connection -import dorkbox.network.rmi.Rmi import dorkbox.util.exceptions.SecurityException import dorkboxTest.network.BaseTest import kotlinx.coroutines.runBlocking @@ -30,18 +28,13 @@ import java.util.concurrent.atomic.AtomicInteger -class RmiOverrideAndProxyTest : BaseTest() { +@Suppress("unused", "RedundantSuspendModifier") +class RmiNestedTest : BaseTest() { companion object { private val idCounter = AtomicInteger() } - @Test - @Throws(SecurityException::class, IOException::class) - fun rmiNetwork() { - rmi() - } - @Test @Throws(SecurityException::class, IOException::class) fun rmiIPC() { @@ -79,15 +72,13 @@ class RmiOverrideAndProxyTest : BaseTest() { * The implType (if it exists, with the same name, and with the same signature + connection parameter) will be called from the interface * instead of the method that would NORMALLY be called. */ - fun rmi(config: (Configuration) -> Unit = {}) { + @Test + fun biDirectionalDoubleRmi() { run { val configuration = serverConfig() - config(configuration) - configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java) - configuration.serialization.register(TestObject::class.java) // the iface is again, on purpose to verify registration orders! + configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java) configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java) -// configuration.serialization.register(OtherObjectImpl::class.java) // registered because this class is sent over the wire val server = Server(configuration) addEndPoint(server) @@ -110,16 +101,13 @@ class RmiOverrideAndProxyTest : BaseTest() { run { val configuration = clientConfig() - config(configuration) - configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java) + configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java) configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java) -// configuration.serialization.register(OtherObjectImpl::class.java) // registered because this class is sent over the wire val client = Client(configuration) addEndPoint(client) - client.onConnect { connection -> connection.logger.error("Connected") connection.createObject { rmiId, remoteObject -> @@ -157,6 +145,145 @@ class RmiOverrideAndProxyTest : BaseTest() { waitForThreads() } + @Test + fun doubleRmi() { + run { + val configuration = serverConfig() + configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java) + configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java) + + val server = Server(configuration) + addEndPoint(server) + + server.onMessage { connection, message -> + // The test is complete when the client sends the OtherObject instance. + // this 'object' is the REAL object, not a proxy, because this object is created within this connection. + if (message.value() == 12.34f) { + stopEndPoints() + } else { + Assert.fail("Incorrect object value") + } + } + + runBlocking { + server.bind(false) + } + } + + + run { + val configuration = clientConfig() + configuration.serialization.register(TestObject::class.java) + configuration.serialization.register(OtherObject::class.java) + + val client = Client(configuration) + addEndPoint(client) + + client.onConnect { connection -> + connection.logger.error("Connected") + connection.createObject { rmiId, remoteObject -> + connection.logger.error("Starting test") + remoteObject.setValue(43.21f) + + // Normal remote method call. + Assert.assertEquals(43.21f, remoteObject.other(), .0001f) + + // Make a remote method call that returns another remote proxy object. + // the "test" object exists in the REMOTE side, as does the "OtherObject" that is created. + // here we have a proxy to both of them. + val otherObject: OtherObject = remoteObject.getOtherObject() + + // Normal remote method call on the second object. + otherObject.setValue(12.34f) + val value = otherObject.value() + Assert.assertEquals(12.34f, value, .0001f) + + + // make sure the "local" object and the "remote" object have the same values + Assert.assertEquals(12.34f, remoteObject.getOtherValue(), .0001f) + + + // When a proxy object is sent, the other side receives its ACTUAL object (not a proxy of it), because + // that is where that object actually exists. + connection.send(otherObject) + } + } + + runBlocking { + client.connect(LOOPBACK, 5000) + } + } + waitForThreads(999999) + } + + @Test + fun singleRmi() { + run { + val configuration = serverConfig() + configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java) + configuration.serialization.register(OtherObjectImpl::class.java) + + val server = Server(configuration) + addEndPoint(server) + + server.onMessage { connection, message -> + // The test is complete when the client sends the OtherObject instance. + // this 'object' is the REAL object + if (message.value() == 43.21f) { + stopEndPoints() + } else { + Assert.fail("Incorrect object value") + } + } + + runBlocking { + server.bind(false) + } + } + + + run { + val configuration = clientConfig() + configuration.serialization.register(TestObject::class.java) + configuration.serialization.register(OtherObjectImpl::class.java) + + val client = Client(configuration) + addEndPoint(client) + + client.onConnect { connection -> + connection.logger.error("Connected") + connection.createObject { rmiId, remoteObject -> + connection.logger.error("Starting test") + remoteObject.setOtherValue(43.21f) + + // Normal remote method call. + Assert.assertEquals(43.21f, remoteObject.getOtherValue(), .0001f) + + // real object + val otherObject: OtherObject = remoteObject.getOtherObject() + + // Normal remote method call on the second object. + val value = otherObject.value() + Assert.assertEquals(43.21f, value, .0001f) + + + // When a proxy object is sent, the other side receives its ACTUAL object (not a proxy of it), because + // that is where that object actually exists. + connection.send(otherObject) + } + } + + runBlocking { + client.connect(LOOPBACK, 5000) + } + } + waitForThreads(999999) + } + + + + + private interface TestObject { suspend fun setValue(aFloat: Float) suspend fun setOtherValue(aFloat: Float) @@ -174,7 +301,6 @@ class RmiOverrideAndProxyTest : BaseTest() { @Transient private val ID = idCounter.getAndIncrement() - @Rmi private val otherObject: OtherObject = OtherObjectImpl() private var aFloat = 0f @@ -204,10 +330,47 @@ class RmiOverrideAndProxyTest : BaseTest() { } override fun getOtherObject(): OtherObject { + return otherObject + } + + override fun hashCode(): Int { + return ID + } + } + + private class TestObjectAnnotImpl : TestObject { + @Transient + private val ID = idCounter.getAndIncrement() + + private val otherObject: OtherObject = OtherObjectImpl() + + private var aFloat = 0f + override suspend fun setValue(aFloat: Float) { throw RuntimeException("Whoops!") } - fun getOtherObject(connection: Connection): OtherObject { + suspend fun setValue(connection: Connection, aFloat: Float) { + connection.logger.error("receiving") + this.aFloat = aFloat + } + + override suspend fun setOtherValue(aFloat: Float) { + otherObject.setValue(aFloat) + } + + override suspend fun getOtherValue(): Float { + return otherObject.value() + } + + override fun other(): Float { + throw RuntimeException("Whoops!") + } + + fun other(connection: Connection): Float { + return aFloat + } + + override fun getOtherObject(): OtherObject { return otherObject }