Better support for sending RMI objects

This commit is contained in:
nathan 2020-09-01 09:12:30 +02:00
parent a20c55a864
commit 9ec8f4efe3
16 changed files with 759 additions and 393 deletions

View File

@ -72,7 +72,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
private val previousClosedConnectionActivity: Long = 0
private val rmiConnectionSupport = RmiManagerConnections(logger, listenerManager, rmiGlobalSupport, serialization)
private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization)
init {
// have to do some basic validation of our configuration
@ -498,8 +498,17 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveObject(`object`: Any): Int {
return rmiConnectionSupport.saveImplObject(`object`)
val rmiId = rmiConnectionSupport.saveImplObject(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
return rmiId
}
return rmiId
}
/**
@ -519,8 +528,15 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveObject(`object`: Any, objectId: Int): Boolean {
return rmiConnectionSupport.saveImplObject(`object`, objectId)
val success = rmiConnectionSupport.saveImplObject(`object`, objectId)
if (!success) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return success
}
/**
@ -546,7 +562,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val kryoId = serialization.getKryoIdForRmiClient(Iface::class.java)
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiConnectionSupport.getRemoteObject(getConnection(), kryoId, objectId, Iface::class.java)
return rmiConnectionSupport.getProxyObject(getConnection(), kryoId, objectId, Iface::class.java)
}
/**
@ -630,8 +646,15 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveGlobalObject(`object`: Any): Int {
return rmiGlobalSupport.saveImplObject(`object`)
val rmiId = rmiGlobalSupport.saveImplObject(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return rmiId
}
/**
@ -651,8 +674,16 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveGlobalObject(`object`: Any, objectId: Int): Boolean {
return rmiGlobalSupport.saveImplObject(`object`, objectId)
val success = rmiGlobalSupport.saveImplObject(`object`, objectId)
if (!success) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return success
}
/**

View File

@ -20,6 +20,7 @@ import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.server.ServerException
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.UdpMediaDriverConnection
import dorkbox.network.connection.connectionType.ConnectionProperties
import dorkbox.network.connection.connectionType.ConnectionRule
@ -530,8 +531,16 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveGlobalObject(`object`: Any): Int {
return rmiGlobalSupport.saveImplObject(`object`)
val rmiId = rmiGlobalSupport.saveImplObject(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
return rmiId
}
return rmiId
}
/**
@ -551,7 +560,14 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveGlobalObject(`object`: Any, objectId: Int): Boolean {
return rmiGlobalSupport.saveImplObject(`object`, objectId)
val success = rmiGlobalSupport.saveImplObject(`object`, objectId)
if (!success) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return success
}
}

View File

@ -438,8 +438,16 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveObject(`object`: Any): Int {
return rmiConnectionSupport.saveImplObject(`object`)
val rmiId = rmiConnectionSupport.saveImplObject(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.value?.notifyError(this, exception)
}
return rmiId
}
/**
@ -459,8 +467,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @see RemoteObject
*/
@Suppress("DuplicatedCode")
suspend fun saveObject(`object`: Any, objectId: Int): Boolean {
return rmiConnectionSupport.saveImplObject(`object`, objectId)
val success = rmiConnectionSupport.saveImplObject(`object`, objectId)
if (!success) {
val exception = Exception("RMI implementation '${`object`::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.value?.notifyError(this, exception)
}
return success
}
/**
@ -487,7 +502,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val kryoId = endPoint.serialization.getKryoIdForRmiClient(Iface::class.java)
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiConnectionSupport.getRemoteObject(this, kryoId, objectId, Iface::class.java)
return rmiConnectionSupport.getProxyObject(this, kryoId, objectId, Iface::class.java)
}
/**

View File

@ -141,7 +141,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// we only want one instance of these created. These will be called appropriately
val settingsStore: SettingsStore
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, listenerManager, actionDispatch, config.serialization)
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch, config.serialization)
init {
logger.error("NETWORK STACK IS ONLY IPV4 AT THE MOMENT. IPV6 is in progress!")
@ -337,7 +337,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
* from a "global" context
*/
internal open fun getRmiConnectionSupport() : RmiManagerConnections<CONNECTION> {
return RmiManagerConnections(logger, listenerManager, rmiGlobalSupport, serialization)
return RmiManagerConnections(logger, rmiGlobalSupport, serialization)
}
/**

View File

@ -24,11 +24,9 @@ import dorkbox.network.serialization.Serialization
import dorkbox.util.collections.LockFreeIntMap
import mu.KLogger
internal class RmiManagerConnections<CONNECTION: Connection>(
logger: KLogger,
listenerManager: ListenerManager<CONNECTION>,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: Serialization) : RmiObjectCache<CONNECTION>(listenerManager, logger) {
internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: Serialization) : RmiObjectCache(logger) {
// It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections!
private val proxyObjects = LockFreeIntMap<RemoteObject>()
@ -51,20 +49,20 @@ internal class RmiManagerConnections<CONNECTION: Connection>(
/**
* on the connection+client to get a connection-specific remote object (that exists on the server/client)
*/
fun <Iface> getRemoteObject(connection: Connection, kryoId: Int, objectId: Int, interfaceClass: Class<Iface>): Iface {
fun <Iface> getProxyObject(connection: Connection, kryoId: Int, rmiId: Int, interfaceClass: Class<Iface>): Iface {
require(interfaceClass.isInterface) { "iface must be an interface." }
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
var proxyObject = getProxyObject(rmiId)
if (proxyObject == null) {
proxyObject = RmiManagerGlobal.createProxyObject(false,
connection,
serialization,
rmiGlobalSupport.rmiResponseManager,
objectId,
kryoId,
rmiId,
interfaceClass)
saveProxyObject(objectId, proxyObject)
saveProxyObject(rmiId, proxyObject)
}
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
@ -97,30 +95,29 @@ internal class RmiManagerConnections<CONNECTION: Connection>(
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val kryoId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters
val serialization = endPoint.serialization
// We have to lookup the iface, since the proxy object requires it
val implObject = endPoint.serialization.createRmiObject(kryoId, objectParameters)
val implObject = serialization.createRmiObject(kryoId, objectParameters)
val response = if (implObject is Exception) {
// whoops!
ListenerManager.cleanStackTrace(implObject)
endPoint.listenerManager.notifyError(connection, implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI))
} else {
val rmiId = saveImplObject(implObject)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = NullPointerException("Trying to create an RMI object with the INVALID_RMI id!!")
ListenerManager.cleanStackTrace(exception)
endPoint.listenerManager.notifyError(connection, exception)
}
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId))
}
// we send the message ALWAYS, because the client needs to know it worked or not
connection.send(response)
}

View File

@ -32,11 +32,9 @@ import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
internal class RmiManagerGlobal<CONNECTION : Connection>(
logger: KLogger,
listenerManager: ListenerManager<CONNECTION>,
actionDispatch: CoroutineScope,
internal val serialization: Serialization) : RmiObjectCache<CONNECTION>(listenerManager, logger) {
internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: Serialization) : RmiObjectCache(logger) {
companion object {
/**
@ -208,7 +206,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(
/**
* called on "server"
*/
onGlobalObjectCreateRequest(endPoint, connection, message, logger)
onGlobalObjectCreateRequest(endPoint, connection, message)
}
is GlobalObjectCreateResponse -> {
/**
@ -367,14 +365,17 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(
/**
* called on "server"
*/
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<CONNECTION>, connection: CONNECTION, message: GlobalObjectCreateRequest, logger: KLogger) {
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<CONNECTION>,
connection: CONNECTION,
message: GlobalObjectCreateRequest) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters
val serialization = endPoint.serialization
// We have to lookup the iface, since the proxy object requires it
val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters)
val implObject = serialization.createRmiObject(interfaceClassId, objectParameters)
val response = if (implObject is Exception) {
// whoops!

View File

@ -15,8 +15,6 @@
*/
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ListenerManager
import mu.KLogger
/**
@ -25,39 +23,22 @@ import mu.KLogger
* The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there
* would be conflicts in the data structure)
*/
internal open class RmiObjectCache<CONNECTION: Connection>(private val listenerManager: ListenerManager<CONNECTION>, logger: KLogger) {
internal open class RmiObjectCache(logger: KLogger) {
private val implObjects = RemoteObjectStorage(logger)
/**
* @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted)
*/
suspend fun saveImplObject(rmiObject: Any): Int {
val rmiId = implObjects.register(rmiObject)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return rmiId
fun saveImplObject(rmiObject: Any): Int {
return implObjects.register(rmiObject)
}
/**
* @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted)
*/
suspend fun saveImplObject(rmiObject: Any, objectId: Int): Boolean {
val success = implObjects.register(rmiObject, objectId)
if (!success) {
val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return success
fun saveImplObject(rmiObject: Any, objectId: Int): Boolean {
return implObjects.register(rmiObject, objectId)
}
fun <T> getImplObject(rmiId: Int): T? {

View File

@ -49,8 +49,4 @@ class TimeoutException : IOException {
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
constructor(message: String?) : super(message) {}
constructor(cause: Throwable?) : super(cause) {}
companion object {
private const val serialVersionUID = -3526277240277423682L
}
}

View File

@ -47,6 +47,8 @@ import java.lang.reflect.Proxy
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* rmi-server MUST registerRmi both the iface+impl
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/

View File

@ -40,6 +40,7 @@ import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.serialization.KryoExtra
import dorkbox.network.serialization.Serialization.Companion.INVALID_KRYO_ID
/**
* This is to manage serializing RMI objects across the wire...
@ -65,33 +66,60 @@ import dorkbox.network.serialization.KryoExtra
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* rmi-server MUST registerRmi both the iface+impl
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/
class RmiClientReverseSerializer : Serializer<Any>(false) {
class RmiServerSerializer : Serializer<Any>(false) {
override fun write(kryo: Kryo, output: Output, `object`: Any) {
val kryoExtra = kryo as KryoExtra
val rmiConnectionSupport = kryoExtra.connection.rmiConnectionSupport
// have to write what the rmi ID is ONLY. We have to find out if it's a global object or connection scope object!
val connection = kryoExtra.connection
val rmiConnectionSupport = connection.rmiConnectionSupport
// check connection scope first
var id = rmiConnectionSupport.getId(`object`)
// have to write what the rmi ID is ONLY. A remote object sent via a connection IS ONLY a connection-scope object!
// check global scope second
if (id == RemoteObjectStorage.INVALID_RMI) {
id = rmiConnectionSupport.rmiGlobalSupport.getId(`object`)
// check if we have saved it already
var rmiId = rmiConnectionSupport.getId(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
// this means we have to save it. This object is cached so we can have an association between rmiID <-> object
rmiId = rmiConnectionSupport.saveImplObject(`object`)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
connection.logger.error("Unable to save $`object` for use as RMI!")
}
}
output.writeInt(id, true)
output.writeInt(rmiId, true)
}
override fun read(kryo: Kryo, input: Input, iface: Class<*>): Any? {
override fun read(kryo: Kryo, input: Input, interfaceClass: Class<*>): Any? {
val kryoExtra = kryo as KryoExtra
val objectID = input.readInt(true)
val rmiId = input.readInt(true)
val connection = kryoExtra.connection
val kryoId = connection.endPoint.serialization.getKryoIdForRmiClient(iface)
return connection.rmiConnectionSupport.getRemoteObject(connection, kryoId, objectID, iface)
val serialization = connection.endPoint.serialization
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
throw NullPointerException("RMI ID is invalid. Unable to use proxy object!")
}
// the rmi-server will have iface+impl id's
// the rmi-client will have iface id's
return if (interfaceClass.isInterface) {
// normal case. RMI only on 1 side
val kryoId = serialization.rmiHolder.ifaceToId[interfaceClass]!!
require(kryoId != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" }
connection.rmiConnectionSupport.getProxyObject(connection, kryoId, rmiId, interfaceClass)
} else {
// BI-DIRECTIONAL RMI -- THIS IS NOT NORMAL!
// this won't be an interface. It will be an impl (because of how RMI is setup)
val kryoId = serialization.rmiHolder.implToId[interfaceClass]!!
require(kryoId != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" }
val iface = serialization.rmiHolder.idToIface[kryoId]
connection.rmiConnectionSupport.getProxyObject(connection, kryoId, rmiId, iface)
}
}
}

View File

@ -16,39 +16,61 @@
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
import dorkbox.util.collections.IdentityMap
import dorkbox.network.rmi.messages.RmiServerSerializer
internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) {
companion object {
const val IGNORE_REGISTRATION = -1
}
var info: String = ""
fun register(kryo: KryoExtra, rmiIfaceToImpl: IdentityMap<Class<*>, Class<*>>) {
// we have to check if this registration ALREADY exists for RMI. If so, we ignore it.
// RMI kryo-registration is SPECIFICALLY for impl object ONLY DURING INITIAL REGISTRATION!
// if the registration is modified, then the registration will be the iface
if (clazz.isInterface) {
val impl = rmiIfaceToImpl[clazz]
if (impl != null && kryo.classResolver.getRegistration(impl)?.serializer is RmiClientReverseSerializer) {
// do nothing, because this is already registered for RMI
info = "Removed RMI conflict registration for class ${clazz.name}"
id = -1
return
}
/**
* we have to check if this registration ALREADY exists for RMI.
*
* If so, we ignore it - any IFACE or IMPL that already has been assigned to an RMI serializer, *MUST* remain an RMI serializer
* If this class registration will EVENTUALLY be for RMI, then [ClassRegistrationForRmi] will reassign the serializer
*/
open fun register(kryo: KryoExtra, rmi: RmiHolder) {
val savedKryoId: Int? = rmi.ifaceToId[clazz]
} else {
if (kryo.classResolver.getRegistration(clazz)?.serializer is RmiClientReverseSerializer) {
// do nothing, because this is already registered for RMI
info = "Removed RMI conflict registration for class ${clazz.name}"
id = -1
return
}
}
var overriddenSerializer: Serializer<Any>? = null
// otherwise, we are OK to continue to register this
register(kryo)
}
// did we already process this class? We permit overwriting serializers, etc!
if (savedKryoId != null) {
overriddenSerializer = kryo.classResolver.getRegistration(savedKryoId)?.serializer
when (overriddenSerializer) {
is RmiServerSerializer -> {
// do nothing, because this is ALREADY registered for RMI
info = if (serializer == null) {
"CONFLICTED $savedKryoId -> (RMI) Ignored duplicate registration for ${clazz.name}"
} else {
"CONFLICTED $savedKryoId -> (RMI) Ignored duplicate registration for ${clazz.name} (${serializer.javaClass.name})"
}
// mark this for later, so we don't try to do something with it
id = IGNORE_REGISTRATION
return
}
else -> {
// mark that this was overridden!
}
}
}
// otherwise, we are OK to continue to register this
register(kryo)
if (overriddenSerializer != null) {
info = "$info (Replaced $overriddenSerializer)"
}
// now, we want to save the relationship between classes and kryoId
rmi.idToIface[id] = clazz
rmi.ifaceToId[clazz] = id
}
open fun register(kryo: KryoExtra) {}
abstract fun register(kryo: KryoExtra)
abstract fun getInfoArray(): Array<Any>
}

View File

@ -20,7 +20,7 @@ import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) {
override fun register(kryo: KryoExtra) {
id = kryo.register(clazz, serializer).id
info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass?.name}"
info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass.name}"
}
override fun getInfoArray(): Array<Any> {

View File

@ -0,0 +1,136 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.serialization
import dorkbox.network.rmi.messages.RmiServerSerializer
/**
* This is to manage serializing RMI objects across the wire...
*
* NOTE:
* CLIENT: can never send the iface object, if it's RMI, it will send the java Proxy object instead.
* SERVER: can never send the iface object, it will always send the IMPL object instead (because of how kryo works)
*
* **************************
* NOTE: This works because we TRICK kryo serialization by changing what the kryo ID serializer is on each end of the connection
* **************************
*
* What we do is on the server, REWRITE the kryo ID for the impl so that it will send just the rmi ID instead of the object
* on the client, this SAME kryo ID must have this serializer as well, so the proxy object is re-assembled.
*
* Kryo serialization works by inspecting the field VALUE type, not the field DEFINED type... So if you send an actual object, you must
* register specifically for the implementation object.
*
*
* To recap:
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* rmi-server MUST registerRmi both the iface+impl
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/
internal class ClassRegistrationForRmi(ifaceClass: Class<*>,
val implClass: Class<*>,
serializer: RmiServerSerializer) : ClassRegistration(ifaceClass, serializer) {
/**
* In general:
*
* ALL kryo registrations must be for IMPL, because of how kryo works, kryo can ONLY send IMPL objects, and thus serialization
* can only be for IMPL objects. (we can write fields for an IFACE, but when constructing an object, it must be concrete)
*
* To recap how RMI works:
* rmi-client works ONLY with proxy objects
* rmi-server works ONLY with impl objects
* **NOTE: it is possible to have both ends of a connection be BOTH the rmi-client+server (for bi-directional RMI)
*
*
* ####
* To SEND/RECEIVE an RMI object
*
* for rmi-client -> {object} -> rmi-server
* REQUIRES:
* rmi-client: (java proxy object -> bytes)
* register InvocationHandler class with RmiClientSerializer
* register IFACE class
* rmi-server: (bytes -> java impl object)
* register InvocationHandler class with RmiClientSerializer
* register IMPL class
* able to lookup rmiID -> IMPL object
*
* rmi-client -> send proxy object -> {{kryo: InvocationHandler, sends rmiID}}
* {{network}}
* {{kryo: InvocationHandler looks up impl object based on ID}} -> rmi-server receives IMPL object
*
*
* for rmi-server -> {object} -> rmi-client
* REQUIRES:
* rmi-server: (java impl object -> bytes)
* register IMPL object class with RmiServerSerializer
* able to lookup IMPL object -> rmiID
* rmi-client: (bytes -> java proxy object)
* register IFACE class with RmiServerSerializer
* able to lookup/create rmiID -> proxy object
*
* rmi-server -> send impl object -> {{kryo: RmiServerSerializer, sends rmiId}}
* {{network}}
* {{kryo: RmiServerSerializer read rmiID, looks up/creates proxy object using IFACE}} -> rmi-client receives proxy object
*
*
* Requirements for all cases of RMI
* rmi-client:
* send: register InvocationHandler class with RmiClientSerializer
* receive: register IMPL object class with RmiServerSerializer
* lookup IMPL object -> rmiID
*
* rmi-server:
* receive: register InvocationHandler class with RmiClientSerializer
* lookup rmiID -> IMPL object
* send: register IMPL object class with RmiServerSerializer
* lookup IMPL object -> rmiID
*/
override fun register(kryo: KryoExtra, rmi: RmiHolder) {
// we override this, because we ALWAYS will call our RMI registration!
// have to get the ID for the interface (if it exists)
val registration = kryo.classResolver.getRegistration(clazz) // this is ifaceClass, and must match what is defined on the rmi client
if (registration != null) {
id = registration.id
// override that registration
kryo.register(implClass, serializer, id)
} else {
// now register the impl class
id = kryo.register(implClass, serializer).id
}
info = "Registered $id -> (RMI) ${implClass.name}"
// now, we want to save the relationship between classes and kryoId
rmi.ifaceToId[clazz] = id
rmi.idToIface[id] = clazz
// we have to know what the IMPL class is so we can create it for a "createObject" RMI command
rmi.implToId[implClass] = id
rmi.idToImpl[id] = implClass
}
override fun getInfoArray(): Array<Any> {
// the info array has to match for the INTERFACE (not the impl!)
return arrayOf(id, clazz.name, serializer!!::class.java.name)
}
}

View File

@ -0,0 +1,18 @@
package dorkbox.network.serialization
import org.agrona.collections.Int2ObjectHashMap
import org.agrona.collections.Object2IntHashMap
import org.objenesis.instantiator.ObjectInstantiator
/// RMI things
class RmiHolder {
val idToInstantiator : Int2ObjectHashMap<ObjectInstantiator<Any>> = Int2ObjectHashMap()
// note: ONLY RegisterRmi (iface+impl) will have their impl class info saved!
val ifaceToId = Object2IntHashMap<Class<*>>(Serialization.INVALID_KRYO_ID)
val implToId = Object2IntHashMap<Class<*>>(Serialization.INVALID_KRYO_ID)
val idToImpl = Int2ObjectHashMap<Class<*>>()
val idToIface = Int2ObjectHashMap<Class<*>>()
}

View File

@ -15,7 +15,7 @@
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.ClassResolver
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
@ -35,13 +35,12 @@ import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodRequestSerializer
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
import dorkbox.network.rmi.messages.RmiClientSerializer
import dorkbox.network.rmi.messages.RmiServerSerializer
import dorkbox.os.OS
import dorkbox.util.collections.IdentityMap
import dorkbox.util.serialization.SerializationDefaults
import dorkbox.util.serialization.SerializationManager
import kotlinx.coroutines.channels.Channel
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
@ -56,6 +55,7 @@ import java.lang.reflect.InvocationHandler
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.coroutines.Continuation
/**
* Threads reading/writing at the same time a single instance of kryo. it is possible to use a single kryo with the use of
* synchronize, however - that defeats the point of having multi-threaded serialization.
@ -76,14 +76,17 @@ import kotlin.coroutines.Continuation
* an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see
* Kryo#newDefaultSerializer(Class)
*/
open class Serialization(private val references: Boolean = true,
private val factory: SerializerFactory<*>? = null) : SerializationManager<DirectBuffer> {
open class Serialization(private val references: Boolean = true, private val factory: SerializerFactory<*>? = null) : SerializationManager<DirectBuffer> {
companion object {
// -2 is the same value that kryo uses for invalid id's
const val INVALID_KRYO_ID = -2
}
private lateinit var logger: KLogger
private var initialized = false
private val kryoPool: Channel<KryoExtra>
lateinit var classResolver: ClassResolver
private var initialized = atomic(false)
private val kryoPool = MultithreadConcurrentQueue<KryoExtra>(1024) // reasonable size of available kryo's
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class)
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
@ -92,12 +95,6 @@ open class Serialization(private val references: Boolean = true,
private lateinit var savedKryoIdsForRmi: IntArray
private lateinit var savedRegistrationDetails: ByteArray
/// RMI things
private val rmiIfaceToInstantiator : Int2ObjectHashMap<ObjectInstantiator<Any>> = Int2ObjectHashMap()
private val rmiIfaceToImpl = IdentityMap<Class<*>, Class<*>>()
private val rmiImplToIface = IdentityMap<Class<*>, Class<*>>()
// This is a GLOBAL, single threaded only kryo instance.
// This is to make sure that we have an instance of class registration done correctly and (if not) we are
// are notified on the initial thread (instead of on the network update thread)
@ -109,32 +106,161 @@ open class Serialization(private val references: Boolean = true,
private val methodRequestSerializer = MethodRequestSerializer()
private val methodResponseSerializer = MethodResponseSerializer()
private val continuationSerializer = ContinuationSerializer()
private val rmiClientSerializer = RmiClientSerializer()
private val rmiClientReverseSerializer = RmiClientReverseSerializer()
private val rmiServerSerializer = RmiServerSerializer()
val rmiHolder = RmiHolder()
// list of already seen client RMI ids (which the server might not have registered as RMI types).
private var existingRmiIds = CopyOnWriteArrayList<Int>()
// the purpose of the method cache, is to accelerate looking up methods for specific class
private val methodCache : Int2ObjectHashMap<Array<CachedMethod>> = Int2ObjectHashMap()
// reflectASM doesn't work on android
private val useAsm = !OS.isAndroid()
init {
// reasonable size of available kryo's before coroutines are suspended during read/write
val KRYO_COUNT = 64
/**
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
* If the class is already registered, the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method.
*
* The order must be the same at deserialization as it was for serialization.
*
* This must happen before the creation of the client/server
*/
override fun <T> register(clazz: Class<T>): Serialization {
require(!initialized.value) { "Serialization 'register(class)' cannot happen after client/server initialization!" }
kryoPool = Channel(KRYO_COUNT)
// // The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// // with object types... EVEN IF THERE IS A SERIALIZER
// require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
classesToRegister.add(ClassRegistration3(clazz))
return this
}
/**
* Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID
* is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
* IDs must be the same at deserialization as they were for serialization.
*
* This must happen before the creation of the client/server
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
override fun <T> register(clazz: Class<T>, id: Int): Serialization {
require(!initialized.value) { "Serialization 'register(Class, int)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
classesToRegister.add(ClassRegistration1(clazz, id))
return this
}
/**
* Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered,
* the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): Serialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz.name}' with a serializer. It must be an implementation." }
classesToRegister.add(ClassRegistration0(clazz, serializer))
return this
}
/**
* Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is
* overwritten. If the ID is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): Serialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer, int)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz.name}'. It must be an implementation." }
classesToRegister.add(ClassRegistration2(clazz, serializer, id))
return this
}
/**
* There is additional overhead to using RMI.
*
* This enables a "remote endpoint" to access methods and create objects (RMI) for this endpoint.
*
* This is NOT bi-directional, and this endpoint cannot access or create remote objects on the "remote client".
*
* @throws IllegalArgumentException if the iface/impl have previously been overridden
*/
@Synchronized
fun <Iface, Impl : Iface> registerRmi(ifaceClass: Class<Iface>, implClass: Class<Impl>): Serialization {
require(!initialized.value) { "Serialization 'registerRmi(Class, Class)' cannot happen after client/server initialization!" }
require(ifaceClass.isInterface) { "Cannot register an implementation for RMI access. It must be an interface." }
require(!implClass.isInterface) { "Cannot register an interface for RMI implementations. It must be an implementation." }
classesToRegister.add(ClassRegistrationForRmi(ifaceClass, implClass, rmiServerSerializer))
return this
}
/**
* NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
* (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the
* server, thus preventing them from trying to probe the server data structures.
*
* @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo
*/
fun getKryoRegistrationDetails(): ByteArray {
return savedRegistrationDetails
}
/**
* @return the details of all registration IDs for RMI iface serializer rewrites
*/
fun getKryoRmiIds(): IntArray {
return savedKryoIdsForRmi
}
/**
* called as the first think inside [finishInit]
*/
private fun initKryo(): KryoExtra {
val kryo = KryoExtra(methodCache)
@ -174,7 +300,7 @@ open class Serialization(private val references: Boolean = true,
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
// note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again
classesToRegister.forEach { registration ->
registration.register(kryo, rmiIfaceToImpl)
registration.register(kryo, rmiHolder)
}
if (factory != null) {
@ -184,177 +310,20 @@ open class Serialization(private val references: Boolean = true,
return kryo
}
/**
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
* If the class is already registered, the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class) call.")
} else {
classesToRegister.add(ClassRegistration3(clazz))
}
return this
}
/**
* Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID
* is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, id: Int): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, int) call.")
return this
}
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register an interface '${clazz}' with specified ID for serialization. It must be an implementation." }
classesToRegister.add(ClassRegistration1(clazz, id))
return this
}
/**
* Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered,
* the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer) call.")
return this
}
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register an interface '${clazz.name}' with a serializer. It must be an implementation." }
classesToRegister.add(ClassRegistration0(clazz, serializer))
return this
}
/**
* Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is
* overwritten. If the ID is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer, int) call.")
return this
}
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register an interface '${clazz.name}'. It must be an implementation." }
classesToRegister.add(ClassRegistration2(clazz, serializer, id))
return this
}
/**
* There is additional overhead to using RMI.
*
* This enables a "remote endpoint" to access methods and create objects (RMI) for this endpoint.
*
* This is NOT bi-directional, and this endpoint cannot access or create remote objects on the "remote client".
*
* @throws IllegalArgumentException if the iface/impl have previously been overridden
*/
@Synchronized
fun <Iface, Impl : Iface> registerRmi(ifaceClass: Class<Iface>, implClass: Class<Impl>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate registerRmiImplementation(Class, Class) call.")
return this
}
require(ifaceClass.isInterface) { "Cannot register an implementation for RMI access. It must be an interface." }
require(!implClass.isInterface) { "Cannot register an interface for RMI implementations. It must be an implementation." }
classesToRegister.add(ClassRegistrationIfaceAndImpl(ifaceClass, implClass, rmiClientReverseSerializer))
// rmiIfaceToImpl tells us, "the server" how to create a (requested) remote object
// this MUST BE UNIQUE otherwise unexpected and BAD things can happen.
val a = rmiIfaceToImpl.put(ifaceClass, implClass)
val b = rmiImplToIface.put(implClass, ifaceClass)
require(!(a != null || b != null)) {
"Unable to override interface ($ifaceClass) and implementation ($implClass) " +
"because they have already been overridden by something else. It is critical that they are both unique per JVM"
}
return this
}
/**
* Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration. If an ID
* is already in use by a different type, an exception is thrown.
*/
@Synchronized
fun finishInit(endPointClass: Class<*>) {
this.logger = KotlinLogging.logger(endPointClass.simpleName)
// get all classes/fields with @Rmi field annotation.
// The field type must also be added as an RMI type
// val fieldsWithRmiAnnotation = AnnotationDetector.scanClassPath()
// .forAnnotations(Rmi::class.java)
// .on(ElementType.FIELD)
// .collect(AnnotationDefaults.getField)
//
// fieldsWithRmiAnnotation.forEach { field ->
// val fieldType = field.type
// require(fieldType.isInterface) { "RMI annotated fields must be an interface!" }
//
// logger.debug { "Adding additional @Rmi field annotation for RMI registration" }
//
// // now we add this field type as an RMI serializable
//// registerRmi(fieldType, fieldType)
// }
initialized = true
// save off the class-resolver, so we can lookup the class <-> id relationships
classResolver = globalKryo.classResolver
logger = KotlinLogging.logger(endPointClass.simpleName)
if (!initialized.compareAndSet(expect = false, update = true)) {
logger.error("Unable to initialize serialization more than once!")
return
}
// this will set up the class registration information
initKryo()
// now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations based on ID
// in order to get the ID's, these have to be registered with a kryo instance!
@ -362,10 +331,11 @@ open class Serialization(private val references: Boolean = true,
classesToRegister.forEach { registration ->
val id = registration.id
// if the id == -1, it means that this registration was ignored! We don't want to include it -- but we want to log
// that something happened.
if (id == -1) {
logger.debug(registration.info)
// if the id == -1, it means that this registration was ignored!
//
// We don't want to include it -- but we want to log that something happened (the info has been customized)
if (id == ClassRegistration.IGNORE_REGISTRATION) {
logger.warn(registration.info)
return@forEach
}
@ -416,18 +386,20 @@ open class Serialization(private val references: Boolean = true,
// so it is super trivial to map out all possible, relevant types
val kryoId = classRegistration.id
if (classRegistration is ClassRegistrationIfaceAndImpl) {
if (classRegistration is ClassRegistrationForRmi) {
// on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation!
val implClass = classRegistration.implClass
// RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, classRegistration.implClass, kryoId)
RmiUtils.getCachedMethods(logger, globalKryo, useAsm, classRegistration.clazz, implClass, kryoId)
// we ALSO have to cache the instantiator for these, since these are used to create remote objects
val instantiator = globalKryo.instantiatorStrategy.newInstantiatorOf(classRegistration.implClass)
@Suppress("UNCHECKED_CAST")
rmiIfaceToInstantiator[kryoId] = instantiator as ObjectInstantiator<Any>
rmiHolder.idToInstantiator[kryoId] =
globalKryo.instantiatorStrategy.newInstantiatorOf(implClass) as ObjectInstantiator<Any>
// finally, we must save this ID, to tell the remote connection that their interface serializer must change to support
// receiving an RMI impl object as a proxy object
@ -452,7 +424,6 @@ open class Serialization(private val references: Boolean = true,
// save this as a byte array (so class registration validation during connection handshake is faster)
val output = AeronOutput()
try {
globalKryo.writeCompressed(logger, output, registrationDetails.toTypedArray())
} catch (e: Exception) {
@ -465,23 +436,6 @@ open class Serialization(private val references: Boolean = true,
output.close()
}
/**
* NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
* (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the
* server, thus preventing them from trying to probe the server data structures.
*
* @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo
*/
fun getKryoRegistrationDetails(): ByteArray {
return savedRegistrationDetails
}
/**
* @return the details of all registration IDs for RMI iface serializer rewrites
*/
fun getKryoRmiIds(): IntArray {
return savedKryoIdsForRmi
}
/**
* NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
@ -560,14 +514,14 @@ open class Serialization(private val references: Boolean = true,
if (!serializerMatches) {
// JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff explicitly
when {
serializerServer == rmiClientReverseSerializer::class.java.name -> {
// this is for when the impl is on server, and iface is on client
serializerServer == rmiServerSerializer::class.java.name -> {
// this is for when the rmi-server is on the server, and the rmi-client is on client
// after this check, we tell the client that this ID is for RMI
// This necessary because only 1 side registers RMI iface/impl info
}
serializerClient == rmiClientReverseSerializer::class.java.name -> {
// this is for when the impl is on client, and iface is on server
serializerClient == rmiServerSerializer::class.java.name -> {
// this is for when the rmi-server is on client, and the rmi-client is on server
// after this check, we tell MYSELF (the server) that this id is for RMI
// This necessary because only 1 side registers RMI iface/impl info
@ -605,10 +559,50 @@ open class Serialization(private val references: Boolean = true,
input.close()
}
return false
}
/**
* Called when the kryo IDs are updated to be the RMI reverse serializer.
*
* NOTE: the IFACE must already be registered!!
*/
suspend fun <CONNECTION : Connection> updateKryoIdsForRmi(connection: CONNECTION,
rmiModificationIds: IntArray,
onError: suspend (String) -> Unit) {
val typeName = connection.endPoint.type.simpleName
// store all of the classes + kryo registration IDs
rmiModificationIds.forEach {
if (!existingRmiIds.contains(it)) {
existingRmiIds.add(it)
// have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on
// a single thread.
// NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct
// because they are set on initialization
val registration = globalKryo.getRegistration(it)
val regMessage = "$typeName-side RMI serializer for registration $it -> ${registration.type}"
if (registration.type.isInterface) {
logger.debug { "Modifying $regMessage" }
// RMI must be with an interface. If it's not an interface then something is wrong
registration.serializer = rmiServerSerializer
} else {
// note: one way that this can be called is when BOTH the client + server register the same way for RMI IDs. When
// the endpoint serialization is initialized, we also add the RMI IDs to this list, so we don't have to worry about this specific
// scenario
onError("Ignoring unsafe modification of $regMessage")
}
}
}
}
/**
* @return takes a kryo instance from the pool, or creates one if the pool was empty
*/
@ -631,11 +625,14 @@ open class Serialization(private val references: Boolean = true,
fun getKryoIdForRmiClient(interfaceClass: Class<*>): Int {
require (interfaceClass.isInterface) { "Can only get the kryo IDs for RMI on an interface!" }
// if we are the RMI-server, we kryo-register the impl
// if we are the RMI-client, we kryo-register the iface (this is us! This method is only called on the rmi-client)
// BI-DIRECTIONAL RMI -- WILL NOT CALL THIS METHOD!
// for RMI, we store the IMPL class in the class registration -- not the iface!
return classResolver.getRegistration(interfaceClass).id
// the rmi-server will have iface+impl id's
// the rmi-client will have iface id's
val id = rmiHolder.ifaceToId[interfaceClass]!!
require(id != INVALID_KRYO_ID) { "Registration for $interfaceClass is invalid!!" }
return id
}
/**
@ -645,17 +642,19 @@ open class Serialization(private val references: Boolean = true,
*/
fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any {
try {
if (objectParameters == null) {
return rmiIfaceToInstantiator[interfaceClassId].newInstance()
if (objectParameters.isNullOrEmpty()) {
// simple, easy, fast.
return rmiHolder.idToInstantiator[interfaceClassId].newInstance()
}
val size = objectParameters.size
// we have to get the constructor for this object.
val clazz = classResolver.getRegistration(interfaceClassId).type
val constructors = clazz.declaredConstructors
val clazz: Class<*> = rmiHolder.idToImpl[interfaceClassId] ?:
return IllegalArgumentException("Cannot create RMI object for kryo interfaceClassId: $interfaceClassId (no class exists)")
// now have to find the closest match.
val constructors = clazz.declaredConstructors
val size = objectParameters.size
val matchedBySize = constructors.filter { it.parameterCount == size }
if (matchedBySize.size == 1) {
@ -681,7 +680,7 @@ open class Serialization(private val references: Boolean = true,
// find the constructor with the highest match
matchedByType.sortByDescending { it.first }
return matchedByType[0].second.newInstance(*objectParameters)
} catch(e: Exception) {
} catch (e: Exception) {
return e
}
}
@ -693,15 +692,6 @@ open class Serialization(private val references: Boolean = true,
return methodCache[classId]
}
/**
* @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client
* and server, even if in the same JVM. This only attempts to register once.
*/
@Synchronized
fun initialized(): Boolean {
return initialized
}
/**
* # BLOCKING
*
@ -790,36 +780,6 @@ open class Serialization(private val references: Boolean = true,
}
}
suspend fun <CONNECTION: Connection> updateKryoIdsForRmi(connection: CONNECTION, rmiModificationIds: IntArray, onError: suspend (String) -> Unit) {
val typeName = connection.endPoint.type.simpleName
rmiModificationIds.forEach {
if (!existingRmiIds.contains(it)) {
existingRmiIds.add(it)
// have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on
// a single thread.
// NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct
val registration = globalKryo.getRegistration(it)
val regMessage = "$typeName-side RMI serializer for registration $it -> ${registration.type}"
if (registration.type.isInterface) {
logger.debug {
"Modifying $regMessage"
}
// RMI must be with an interface. If it's not an interface then something is wrong
registration.serializer = rmiClientReverseSerializer
} else {
// note: one way that this can be called is when BOTH the client + server register the same way for RMI IDs. When
// the endpoint serialization is initialized, we also add the RMI IDs to this list, so we don't have to worry about this specific
// scenario
onError("Attempting an unsafe modification of $regMessage")
}
}
}
}
// NOTE: These following functions are ONLY called on a single thread!
fun readMessage(buffer: DirectBuffer, offset: Int, length: Int): Any? {
return globalKryo.read(buffer, offset, length)

View File

@ -16,10 +16,8 @@
package dorkboxTest.network.rmi
import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.Rmi
import dorkbox.util.exceptions.SecurityException
import dorkboxTest.network.BaseTest
import kotlinx.coroutines.runBlocking
@ -30,18 +28,13 @@ import java.util.concurrent.atomic.AtomicInteger
class RmiOverrideAndProxyTest : BaseTest() {
@Suppress("unused", "RedundantSuspendModifier")
class RmiNestedTest : BaseTest() {
companion object {
private val idCounter = AtomicInteger()
}
@Test
@Throws(SecurityException::class, IOException::class)
fun rmiNetwork() {
rmi()
}
@Test
@Throws(SecurityException::class, IOException::class)
fun rmiIPC() {
@ -79,15 +72,13 @@ class RmiOverrideAndProxyTest : BaseTest() {
* The implType (if it exists, with the same name, and with the same signature + connection parameter) will be called from the interface
* instead of the method that would NORMALLY be called.
*/
fun rmi(config: (Configuration) -> Unit = {}) {
@Test
fun biDirectionalDoubleRmi() {
run {
val configuration = serverConfig()
config(configuration)
configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java)
configuration.serialization.register(TestObject::class.java) // the iface is again, on purpose to verify registration orders!
configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java)
configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java)
// configuration.serialization.register(OtherObjectImpl::class.java) // registered because this class is sent over the wire
val server = Server<Connection>(configuration)
addEndPoint(server)
@ -110,16 +101,13 @@ class RmiOverrideAndProxyTest : BaseTest() {
run {
val configuration = clientConfig()
config(configuration)
configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java)
configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java)
configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java)
// configuration.serialization.register(OtherObjectImpl::class.java) // registered because this class is sent over the wire
val client = Client<Connection>(configuration)
addEndPoint(client)
client.onConnect { connection ->
connection.logger.error("Connected")
connection.createObject<TestObject> { rmiId, remoteObject ->
@ -157,6 +145,145 @@ class RmiOverrideAndProxyTest : BaseTest() {
waitForThreads()
}
@Test
fun doubleRmi() {
run {
val configuration = serverConfig()
configuration.serialization.registerRmi(TestObject::class.java, TestObjectAnnotImpl::class.java)
configuration.serialization.registerRmi(OtherObject::class.java, OtherObjectImpl::class.java)
val server = Server<Connection>(configuration)
addEndPoint(server)
server.onMessage<OtherObject> { connection, message ->
// The test is complete when the client sends the OtherObject instance.
// this 'object' is the REAL object, not a proxy, because this object is created within this connection.
if (message.value() == 12.34f) {
stopEndPoints()
} else {
Assert.fail("Incorrect object value")
}
}
runBlocking {
server.bind(false)
}
}
run {
val configuration = clientConfig()
configuration.serialization.register(TestObject::class.java)
configuration.serialization.register(OtherObject::class.java)
val client = Client<Connection>(configuration)
addEndPoint(client)
client.onConnect { connection ->
connection.logger.error("Connected")
connection.createObject<TestObject> { rmiId, remoteObject ->
connection.logger.error("Starting test")
remoteObject.setValue(43.21f)
// Normal remote method call.
Assert.assertEquals(43.21f, remoteObject.other(), .0001f)
// Make a remote method call that returns another remote proxy object.
// the "test" object exists in the REMOTE side, as does the "OtherObject" that is created.
// here we have a proxy to both of them.
val otherObject: OtherObject = remoteObject.getOtherObject()
// Normal remote method call on the second object.
otherObject.setValue(12.34f)
val value = otherObject.value()
Assert.assertEquals(12.34f, value, .0001f)
// make sure the "local" object and the "remote" object have the same values
Assert.assertEquals(12.34f, remoteObject.getOtherValue(), .0001f)
// When a proxy object is sent, the other side receives its ACTUAL object (not a proxy of it), because
// that is where that object actually exists.
connection.send(otherObject)
}
}
runBlocking {
client.connect(LOOPBACK, 5000)
}
}
waitForThreads(999999)
}
@Test
fun singleRmi() {
run {
val configuration = serverConfig()
configuration.serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java)
configuration.serialization.register(OtherObjectImpl::class.java)
val server = Server<Connection>(configuration)
addEndPoint(server)
server.onMessage<OtherObject> { connection, message ->
// The test is complete when the client sends the OtherObject instance.
// this 'object' is the REAL object
if (message.value() == 43.21f) {
stopEndPoints()
} else {
Assert.fail("Incorrect object value")
}
}
runBlocking {
server.bind(false)
}
}
run {
val configuration = clientConfig()
configuration.serialization.register(TestObject::class.java)
configuration.serialization.register(OtherObjectImpl::class.java)
val client = Client<Connection>(configuration)
addEndPoint(client)
client.onConnect { connection ->
connection.logger.error("Connected")
connection.createObject<TestObject> { rmiId, remoteObject ->
connection.logger.error("Starting test")
remoteObject.setOtherValue(43.21f)
// Normal remote method call.
Assert.assertEquals(43.21f, remoteObject.getOtherValue(), .0001f)
// real object
val otherObject: OtherObject = remoteObject.getOtherObject()
// Normal remote method call on the second object.
val value = otherObject.value()
Assert.assertEquals(43.21f, value, .0001f)
// When a proxy object is sent, the other side receives its ACTUAL object (not a proxy of it), because
// that is where that object actually exists.
connection.send(otherObject)
}
}
runBlocking {
client.connect(LOOPBACK, 5000)
}
}
waitForThreads(999999)
}
private interface TestObject {
suspend fun setValue(aFloat: Float)
suspend fun setOtherValue(aFloat: Float)
@ -174,7 +301,6 @@ class RmiOverrideAndProxyTest : BaseTest() {
@Transient
private val ID = idCounter.getAndIncrement()
@Rmi
private val otherObject: OtherObject = OtherObjectImpl()
private var aFloat = 0f
@ -204,10 +330,47 @@ class RmiOverrideAndProxyTest : BaseTest() {
}
override fun getOtherObject(): OtherObject {
return otherObject
}
override fun hashCode(): Int {
return ID
}
}
private class TestObjectAnnotImpl : TestObject {
@Transient
private val ID = idCounter.getAndIncrement()
private val otherObject: OtherObject = OtherObjectImpl()
private var aFloat = 0f
override suspend fun setValue(aFloat: Float) {
throw RuntimeException("Whoops!")
}
fun getOtherObject(connection: Connection): OtherObject {
suspend fun setValue(connection: Connection, aFloat: Float) {
connection.logger.error("receiving")
this.aFloat = aFloat
}
override suspend fun setOtherValue(aFloat: Float) {
otherObject.setValue(aFloat)
}
override suspend fun getOtherValue(): Float {
return otherObject.value()
}
override fun other(): Float {
throw RuntimeException("Whoops!")
}
fun other(connection: Connection): Float {
return aFloat
}
override fun getOtherObject(): OtherObject {
return otherObject
}