Added RMI serialization syncrhonization between the server/client during the handshake. Fixed serialization for sending RMI objects in both directions. Removed NetworkSerializationManager, and instead pass the Serialization implementation.

This commit is contained in:
nathan 2020-08-26 15:34:04 +02:00
parent 570aeee52c
commit b6f337d0e6
35 changed files with 464 additions and 355 deletions

View File

@ -265,6 +265,10 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
throw exception
}
// before we do anything else, we have to correct the RMI serializers, as necessary.
val rmiModificationIds = connectionInfo.kryoIdsForRmi
updateKryoIdsForRmi(newConnection, rmiModificationIds)
connection = newConnection
connections.add(newConnection)
@ -514,7 +518,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// NOTE: It's not possible to have reified inside a virtual function
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiConnectionSupport.getRemoteObject(getConnection(), this, objectId, Iface::class.java)
return rmiConnectionSupport.getRemoteObject(getConnection(), objectId, Iface::class.java)
}
/**
@ -644,6 +648,6 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// NOTE: It's not possible to have reified inside a virtual function
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiGlobalSupport.getGlobalRemoteObject(getConnection(), this, objectId, Iface::class.java)
return rmiGlobalSupport.getGlobalRemoteObject(getConnection(), objectId, Iface::class.java)
}
}

View File

@ -18,7 +18,6 @@ package dorkbox.network
import dorkbox.network.aeron.CoroutineBackoffIdleStrategy
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.network.storage.PropertyStore
import dorkbox.network.storage.SettingsStore
@ -96,7 +95,7 @@ open class Configuration {
/**
* Specify the serialization manager to use.
*/
var serialization: NetworkSerializationManager = Serialization.DEFAULT()
var serialization: Serialization = Serialization.DEFAULT()
/**
* The idle strategy used when polling the Media Driver for new messages. BackOffIdleStrategy is the DEFAULT.

View File

@ -289,6 +289,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*/
suspend fun close() {
if (isClosed.compareAndSet(expect = false, update = true)) {
// the server 'handshake' connection info is already cleaned up before this is called
subscription.close()
val closeTimeoutTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
@ -480,7 +482,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// NOTE: It's not possible to have reified inside a virtual function
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiConnectionSupport.getRemoteObject(this, endPoint, objectId, Iface::class.java)
return rmiConnectionSupport.getRemoteObject(this, objectId, Iface::class.java)
}
/**
@ -504,7 +506,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// NOTE: It's not possible to have reified inside a virtual function
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
return rmiConnectionSupport.rmiGlobalSupport.getGlobalRemoteObject(this, endPoint, objectId, Iface::class.java)
return rmiConnectionSupport.rmiGlobalSupport.getGlobalRemoteObject(this, objectId, Iface::class.java)
}
/**

View File

@ -177,16 +177,17 @@ internal class CryptoManagement(val logger: KLogger,
return SecretKeySpec(hash.digest(), "AES")
}
fun encrypt(publicationPort: Int,
fun encrypt(clientPublicKeyBytes: ByteArray,
publicationPort: Int,
subscriptionPort: Int,
connectionSessionId: Int,
connectionStreamId: Int,
clientPublicKeyBytes: ByteArray): ByteArray {
kryoRmiIds: IntArray): ByteArray {
val secretKeySpec = generateAesKey(clientPublicKeyBytes, clientPublicKeyBytes, publicKeyBytes)
val iv = ByteArray(GCM_IV_LENGTH)
secureRandom.nextBytes(iv);
secureRandom.nextBytes(iv)
val gcmParameterSpec = GCMParameterSpec(GCM_TAG_LENGTH * 8, iv)
aesCipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, gcmParameterSpec)
@ -197,6 +198,10 @@ internal class CryptoManagement(val logger: KLogger,
data.writeInt(connectionStreamId)
data.writeInt(publicationPort)
data.writeInt(subscriptionPort)
data.writeInt(kryoRmiIds.size)
kryoRmiIds.forEach {
data.writeInt(it)
}
val bytes = data.toBytes()
@ -226,12 +231,25 @@ internal class CryptoManagement(val logger: KLogger,
val data = AeronInput(aesCipher.doFinal(secretBytes))
val sessionId = data.readInt()
val streamId = data.readInt()
val publicationPort = data.readInt()
val subscriptionPort = data.readInt()
val rmiIds = mutableListOf<Int>()
val rmiIdSize = data.readInt()
for (i in 0 until rmiIdSize) {
rmiIds.add(data.readInt())
}
// now read data off
return ClientConnectionInfo(sessionId = data.readInt(),
streamId = data.readInt(),
publicationPort = data.readInt(),
subscriptionPort = data.readInt(),
publicKey = serverPublicKeyBytes)
return ClientConnectionInfo(sessionId = sessionId,
streamId = streamId,
publicationPort = publicationPort,
subscriptionPort = subscriptionPort,
publicKey = serverPublicKeyBytes,
kryoIdsForRmi = rmiIds.toIntArray())
}
override fun hashCode(): Int {

View File

@ -20,13 +20,14 @@ import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.client.ClientRejectedException
import dorkbox.network.connection.ping.PingMessage
import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.RmiManagerGlobal
import dorkbox.network.rmi.messages.RmiMessage
import dorkbox.network.serialization.KryoExtra
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.network.storage.SettingsStore
import dorkbox.util.NamedThreadFactory
import dorkbox.util.exceptions.SecurityException
@ -43,6 +44,7 @@ import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import java.io.File
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
@ -124,7 +126,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
/**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basics.
*/
val serialization: NetworkSerializationManager
val serialization: Serialization
private val sendIdleStrategy: CoroutineIdleStrategy
@ -141,7 +143,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// we only want one instance of these created. These will be called appropriately
val settingsStore: SettingsStore
internal val globalThreadUnsafeKryo: KryoExtra = config.serialization.takeKryo()
// list of already seen client RMI ids (which the server might not have registered as RMI types).
private var alreadySeenClientRmiIds = CopyOnWriteArrayList<Int>()
private val networkReadKryo: KryoExtra = config.serialization.takeKryo()
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch, config.serialization)
@ -443,9 +448,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
try {
globalThreadUnsafeKryo.write(message)
networkReadKryo.write(message)
val buffer = globalThreadUnsafeKryo.writerBuffer
val buffer = networkReadKryo.writerBuffer
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
@ -484,7 +489,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
*/
fun readHandshakeMessage(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Any? {
try {
val message = globalThreadUnsafeKryo.read(buffer, offset, length)
val message = networkReadKryo.read(buffer, offset, length)
logger.trace {
"[${header.sessionId()}] received: $message"
}
@ -521,7 +526,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
val message: Any?
try {
message = globalThreadUnsafeKryo.read(buffer, offset, length, connection)
message = networkReadKryo.read(buffer, offset, length, connection)
logger.trace {
// The sessionId is globally unique, and is assigned by the server.
val sessionId = header.sessionId()
@ -710,4 +715,29 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
shutdownLatch.countDown()
}
}
suspend fun updateKryoIdsForRmi(connection: CONNECTION, rmiModificationIds: IntArray) {
rmiModificationIds.forEach {
if (!alreadySeenClientRmiIds.contains(it)) {
alreadySeenClientRmiIds.add(it)
// have to modify the network read kryo with the correct registration id -> serializer info. This is a GLOBAL change made on
// a single thread.
// NOTE: This change will ONLY modify the network-read kryo. This is all we need to modify. The write kryo's will already be correct
val registration = networkReadKryo.getRegistration(it)
val regMessage = "${type.simpleName}-side RMI serializer for registration $it -> ${registration.type}"
if (registration.type.isInterface) {
logger.debug {
"Modifying $regMessage"
}
// RMI must be with an interface. If it's not an interface then something is wrong
registration.serializer = serialization.rmiClientReverseSerializer
} else {
listenerManager.notifyError(connection,
ClientRejectedException("Attempting an unsafe modification of $regMessage"))
}
}
}
}
}

View File

@ -21,7 +21,8 @@ internal class ClientConnectionInfo(val subscriptionPort: Int,
val publicationPort: Int,
val sessionId: Int,
val streamId: Int,
val publicKey: ByteArray) {
val publicKey: ByteArray,
val kryoIdsForRmi: IntArray) {
fun log(handshakeSessionId: Int, logger: Logger) {
logger.debug("[{}] connected {}|{} (encrypted {})", handshakeSessionId, publicationPort, subscriptionPort, sessionId)

View File

@ -104,7 +104,8 @@ internal class ClientHandshake<CONNECTION: Connection>(private val logger: KLogg
val registrationMessage = HandshakeMessage.helloFromClient(
oneTimePad = oneTimePad,
publicKey = config.settingsStore.getPublicKey()!!,
registrationData = config.serialization.getKryoRegistrationDetails()
registrationData = config.serialization.getKryoRegistrationDetails(),
registrationRmiIdData = config.serialization.getKryoRmiIds()
)

View File

@ -19,6 +19,7 @@ package dorkbox.network.handshake
* Internal message to handle the connection registration process
*/
internal class HandshakeMessage private constructor() {
// the public key is used to encrypt the data in the handshake
var publicKey: ByteArray? = null
@ -46,19 +47,7 @@ internal class HandshakeMessage private constructor() {
// the client sends it's registration data to the server to make sure that the registered classes are the same between the client/server
var registrationData: ByteArray? = null
// NOTE: this is for ECDSA!
// var eccParameters: IESParameters? = null
// > 0 when we are ready to setup the connection (hasMore will always be false if this is >0). 0 when we are ready to connect
// ALSO used if there are fragmented frames for registration data (since we have to split it up to fit inside a single UDP packet without fragmentation)
// var upgradeType = 0.toByte()
// true when we are fully upgraded
// var upgraded = false
var registrationRmiIdData: IntArray? = null
companion object {
const val INVALID = -1
@ -67,12 +56,13 @@ internal class HandshakeMessage private constructor() {
const val DONE = 2
const val DONE_ACK = 3
fun helloFromClient(oneTimePad: Int, publicKey: ByteArray, registrationData: ByteArray): HandshakeMessage {
fun helloFromClient(oneTimePad: Int, publicKey: ByteArray, registrationData: ByteArray, registrationRmiIdData: IntArray): HandshakeMessage {
val hello = HandshakeMessage()
hello.state = HELLO
hello.oneTimePad = oneTimePad
hello.publicKey = publicKey
hello.registrationData = registrationData
hello.registrationRmiIdData = registrationRmiIdData
return hello
}

View File

@ -97,6 +97,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
val serialization = config.serialization
try {
// VALIDATE:: Check to see if there are already too many clients connected.
@ -115,7 +116,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
// VALIDATE:: make sure the serialization matches between the client/server!
if (!config.serialization.verifyKryoRegistration(message.registrationData!!)) {
if (!serialization.verifyKryoRegistration(message.registrationData!!)) {
listenerManager.notifyError(ClientRejectedException("Connection from $clientAddressString not allowed! Registration data mismatch."))
return
}
@ -202,11 +203,11 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
"Creating new connection $clientConnection"
}
val connection: Connection = server.newConnection(ConnectionParams(server, clientConnection, validateRemoteAddress))
val connection = server.newConnection(ConnectionParams(server, clientConnection, validateRemoteAddress)) as CONNECTION
// VALIDATE:: are we allowed to connect to this server (now that we have the initial server information)
@Suppress("UNCHECKED_CAST")
val permitConnection = listenerManager.notifyFilter(connection as CONNECTION)
val permitConnection = listenerManager.notifyFilter(connection)
if (!permitConnection) {
// have to unwind actions!
connectionsPerIpCounts.getAndDecrement(clientAddress)
@ -225,15 +226,35 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
///////////////
//// RMI
///////////////
// if necessary (and only for RMI id's that have never been seen before) we want to re-write our kryo information
val rmiModificationIds = message.registrationRmiIdData!!
server.updateKryoIdsForRmi(connection, rmiModificationIds)
///////////////
/// HANDSHAKE
///////////////
// The one-time pad is used to encrypt the session ID, so that ONLY the correct client knows what it is!
val successMessage = HandshakeMessage.helloAckToClient(sessionId)
// if necessary, we also send the kryo RMI id's that are registered as RMI on this endpoint, but maybe not on the other endpoint
// now create the encrypted payload, using ECDH
successMessage.registrationData = server.crypto.encrypt(publicationPort,
successMessage.registrationData = server.crypto.encrypt(clientPublicKeyBytes!!,
publicationPort,
subscriptionPort,
connectionSessionId,
connectionStreamId,
clientPublicKeyBytes!!)
serialization.getKryoRmiIds())
successMessage.publicKey = server.crypto.publicKeyBytes

View File

@ -20,13 +20,13 @@ import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.collections.LockFreeIntMap
import mu.KLogger
internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
private val serialization: Serialization) : RmiObjectCache(logger) {
// It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections!
private val proxyObjects = LockFreeIntMap<RemoteObject>()
@ -49,12 +49,16 @@ internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
/**
* on the connection+client to get a connection-specific remote object (that exists on the server/client)
*/
fun <Iface> getRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class<Iface>): Iface {
fun <Iface> getRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class<Iface>): Iface {
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = RmiManagerGlobal.createProxyObject(false, connection, serialization, rmiGlobalSupport.rmiResponseManager,
endPoint.type.simpleName, objectId, interfaceClass)
proxyObject = RmiManagerGlobal.createProxyObject(false,
connection,
serialization,
rmiGlobalSupport.rmiResponseManager,
objectId,
interfaceClass)
saveProxyObject(objectId, proxyObject)
}

View File

@ -24,7 +24,7 @@ import dorkbox.network.rmi.messages.GlobalObjectCreateRequest
import dorkbox.network.rmi.messages.GlobalObjectCreateResponse
import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.classes.ClassHelper
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
@ -34,7 +34,7 @@ import java.util.*
internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
internal val serialization: Serialization) : RmiObjectCache(logger) {
companion object {
/**
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
@ -52,9 +52,11 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
* @param interfaceClass this is the RMI interface class
*/
internal fun createProxyObject(isGlobalObject: Boolean,
connection: Connection, serialization: NetworkSerializationManager,
responseManager: RmiResponseManager, namePrefix: String,
rmiId: Int, interfaceClass: Class<*>): RemoteObject {
connection: Connection,
serialization: Serialization,
responseManager: RmiResponseManager,
rmiId: Int,
interfaceClass: Class<*>): RemoteObject {
require(interfaceClass.isInterface) { "iface must be an interface." }
@ -63,7 +65,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
val classId = serialization.getClassId(interfaceClass)
val cachedMethods = serialization.getMethods(classId)
val name = "<${namePrefix}-proxy #$rmiId>"
val name = "<${connection.endPoint().type.simpleName}-proxy #$rmiId>"
// the ACTUAL proxy is created in the connection impl. Our proxy handler MUST BE suspending because of:
// 1) how we send data on the wire
@ -219,7 +221,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
isGlobal: Boolean,
rmiId: Int,
callback: suspend (Int, Any) -> Unit,
serialization: NetworkSerializationManager) {
serialization: Serialization) {
// we only create the proxy + execute the callback if the RMI id is valid!
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
@ -232,7 +234,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
// create the client-side proxy object, if possible. This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId)
if (proxyObject == null) {
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, endPoint.type.simpleName, rmiId, interfaceClass)
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, rmiId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject)
}
@ -250,13 +252,13 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
/**
* on the connection+client to get a global remote object (that exists on the server)
*/
fun <Iface> getGlobalRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class<Iface>): Iface {
fun <Iface> getGlobalRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class<Iface>): Iface {
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
// so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, endPoint.type.simpleName, objectId, interfaceClass)
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, objectId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject)
}

View File

@ -40,6 +40,13 @@ internal open class RmiObjectCache(logger: KLogger) {
return implObjects[rmiId] as T?
}
/**
* @return the ID registered for the specified object, or INVALID_RMI if not found.
*/
fun <T> getId(implObject: T): Int {
return implObjects.getId(implObject)
}
fun <T> removeImplObject(rmiId: Int): T? {
return implObjects.remove(rmiId) as T?
}

View File

@ -39,38 +39,54 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.util.IdentityMap
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.serialization.KryoExtra
/**
* this is to manage serializing proxy object objects across the wire...
* This is to manage serializing proxy object objects across the wire...
*
* SO the "rmi client" sends an RMI proxy object, and the "rmi server" reads an actual object
* This is when the RMI server sends an impl object to a client, the client must receive a proxy object (instead of the impl object)
*
* Serializes an object registered with the RmiBridge so the receiving side gets a [RemoteObject] proxy rather than the bytes for the
* serialized object.
* NOTE: this works because the CLIENT can never send the actual iface object, if it's RMI, it will send the java Proxy object instead.
* The SERVER can never send the iface object, it will send the IMPL object instead
*
* @author Nathan Sweet <misc></misc>@n4te.com>
* What we do is on the server, REWRITE the kryo ID for the impl so that it will send just the rmi ID instead of the object
* on the client, this SAME kryo ID must have this serializer as well, so the proxy object is re-assembled.
*
* Kryo serialization works by inspecting the field VALUE type, not the field DEFINED type... So if you send an actual object, you must
* register specifically for the implementation object.
*
*
* To recap:
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/
class RmiObjectSerializer(private val rmiImplToIface: IdentityMap<Class<*>, Class<*>>) : Serializer<Any>(false) {
class RmiClientReverseSerializer(private val rmiImplToIface: IdentityMap<Class<*>, Class<*>>) : Serializer<Any>(false) {
override fun write(kryo: Kryo, output: Output, `object`: Any) {
println(" FIX ObjectResponseSerializer ")
val kryoExtra = kryo as KryoExtra
// val id = kryoExtra.rmiSupport.getRegisteredId(`object`) //
// output.writeInt(id, true)
output.writeInt(0, true)
val rmiConnectionSupport = kryoExtra.connection.rmiConnectionSupport
// have to write what the rmi ID is ONLY. We have to find out if it's a global object or connection scope object!
// check connection scope first
var id = rmiConnectionSupport.getId(`object`)
// check global scope second
if (id == RemoteObjectStorage.INVALID_RMI) {
id = rmiConnectionSupport.rmiGlobalSupport.getId(`object`)
}
output.writeInt(id, true)
}
override fun read(kryo: Kryo, input: Input, implementationType: Class<*>): Any? {
println(" FIX ObjectResponseSerializer ")
override fun read(kryo: Kryo, input: Input, iface: Class<*>): Any? {
val kryoExtra = kryo as KryoExtra
val objectID = input.readInt(true)
// We have to lookup the iface, since the proxy object requires it
val iface = rmiImplToIface.get(implementationType)
val connection = kryoExtra.connection
// return kryoExtra.rmiSupport.getProxyObject(connection, objectID, iface)
return null
return connection.rmiConnectionSupport.getRemoteObject(connection, objectID, iface)
}
}
/// TODO: FIX THIS CLASS MAYBE!

View File

@ -24,11 +24,28 @@ import dorkbox.network.serialization.KryoExtra
import java.lang.reflect.Proxy
/**
* this is to manage serializing proxy object objects across the wire...
* This is to manage serializing proxy object objects across the wire...
*
* SO the "rmi client" sends an RMI proxy object, and the "rmi server" reads an actual object
* This is when the RMI server sends an impl object to a client, the client must receive a proxy object (instead of the impl object)
*
* NOTE: this works because the CLIENT can never send the actual iface object, if it's RMI, it will send the java Proxy object instead.
* The SERVER can never send the iface object, it will send the IMPL object instead
*
* What we do is on the server, REWRITE the kryo ID for the impl so that it will send just the rmi ID instead of the object
* on the client, this SAME kryo ID must have this serializer as well, so the proxy object is re-assembled.
*
* Kryo serialization works by inspecting the field VALUE type, not the field DEFINED type... So if you send an actual object, you must
* register specifically for the implementation object.
*
*
* To recap:
* rmi-client: send proxy -> RmiIfaceSerializer -> network -> RmiIfaceSerializer -> impl object (rmi-server)
* rmi-server: send impl -> RmiImplSerializer -> network -> RmiImplSerializer -> proxy object (rmi-client)
*
* During the handshake, if the impl object 'lives' on the CLIENT, then the client must tell the server that the iface ID must use this serializer.
* If the impl object 'lives' on the SERVER, then the server must tell the client about the iface ID
*/
class RmiClientRequestSerializer : Serializer<Any>() {
class RmiClientSerializer : Serializer<Any>() {
override fun write(kryo: Kryo, output: Output, proxyObject: Any) {
val handler = Proxy.getInvocationHandler(proxyObject) as RmiClient
output.writeBoolean(handler.isGlobal)

View File

@ -17,24 +17,14 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal open class ClassRegistration(var clazz: Class<*>) {
var id = 0
var serializer: Serializer<*>? = null
internal interface ClassRegistration {
var id: Int
val clazz: Class<*>
val serializer: Serializer<*>?
open fun register(kryo: KryoExtra) {
val registration = kryo.register(clazz)
id = registration.id
}
fun register(kryo: KryoExtra)
open fun info(): String {
return "Registered $id -> ${clazz.name}"
}
fun info(): String
fun getInfoArray(): Array<Any> {
return if (serializer != null) {
arrayOf(id, clazz.name, serializer!!::class.java.name)
} else {
arrayOf(id, clazz.name, "")
}
}
fun getInfoArray(): Array<Any>
}

View File

@ -17,10 +17,9 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz) {
init {
this.serializer = serializer
}
internal class ClassRegistration0(override val clazz: Class<*>,
override val serializer: Serializer<*>) : ClassRegistration {
override var id: Int = 0
override fun register(kryo: KryoExtra) {
id = kryo.register(clazz, serializer).id
@ -29,4 +28,8 @@ internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) :
override fun info(): String {
return "Registered $id -> ${clazz.name} using ${serializer?.javaClass?.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, serializer::class.java.name)
}
}

View File

@ -15,10 +15,10 @@
*/
package dorkbox.network.serialization
internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz) {
init {
this.id = id
}
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration1(override val clazz: Class<*>, override var id: Int) : ClassRegistration {
override val serializer: Serializer<*>? = null
override fun register(kryo: KryoExtra) {
kryo.register(clazz, id)
@ -27,4 +27,8 @@ internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(
override fun info(): String {
return "Registered $id -> (specified) ${clazz.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, "")
}
}

View File

@ -17,17 +17,16 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz) {
init {
this.serializer = serializer
this.id = id
}
internal class ClassRegistration2(override val clazz: Class<*>, override val serializer: Serializer<*>, override var id: Int) : ClassRegistration {
override fun register(kryo: KryoExtra) {
kryo.register(clazz, serializer, id)
}
override fun info(): String {
return "Registered $id -> (specified) ${clazz.name} using ${serializer?.javaClass?.name}"
return "Registered $id -> (specified) ${clazz.name} using ${serializer.javaClass.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, serializer::class.java.name)
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
internal open class ClassRegistration3(override var clazz: Class<*>) : ClassRegistration {
override var id = 0
override val serializer: Serializer<*>? = null
override fun register(kryo: KryoExtra) {
val registration = kryo.register(clazz)
id = registration.id
}
override fun info(): String {
return "Registered $id -> ${clazz.name}"
}
override fun getInfoArray(): Array<Any> {
return arrayOf(id, clazz.name, "")
}
}

View File

@ -15,20 +15,34 @@
*/
package dorkbox.network.serialization
import dorkbox.network.rmi.messages.RmiObjectSerializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass: Class<*>, rmiObjectSerializer: RmiObjectSerializer) :
ClassRegistration(implClass) {
internal class ClassRegistrationIfaceAndImpl(val ifaceClass: Class<*>,
val implClass: Class<*>,
override val serializer: RmiClientReverseSerializer) : ClassRegistration {
init {
this.serializer = rmiObjectSerializer
}
override var id: Int = 0
override val clazz: Class<*> = ifaceClass // this has to match what is defined on the rmi client
override fun register(kryo: KryoExtra) {
id = kryo.register(clazz, serializer).id
// have to get the ID for the interface (if it exists)
val registration = kryo.classResolver.getRegistration(ifaceClass)
if (registration != null) {
id = registration.id
// override that registration
kryo.register(implClass, serializer, id).id
} else {
id = kryo.register(implClass, serializer).id
}
}
override fun info(): String {
return "Registered $id -> (RMI) ${implClass.name}"
}
override fun getInfoArray(): Array<Any> {
// the info array has to match for the INTERFACE (not the impl!)
return arrayOf(id, ifaceClass.name, serializer::class.java.name)
}
}

View File

@ -1,154 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer
import dorkbox.network.rmi.CachedMethod
import dorkbox.util.serialization.SerializationManager
import org.agrona.DirectBuffer
interface NetworkSerializationManager : SerializationManager<DirectBuffer> {
/**
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
* If the class is already registered, the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
override fun <T> register(clazz: Class<T>): NetworkSerializationManager
/**
* Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID
* is already in use by a different type, a [KryoException] is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
override fun <T> register(clazz: Class<T>, id: Int): NetworkSerializationManager
/**
* Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered,
* the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): NetworkSerializationManager
/**
* Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is
* overwritten. If the ID is already in use by a different type, a [KryoException] is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): NetworkSerializationManager
/**
* @return takes a kryo instance from the pool.
*/
fun takeKryo(): KryoExtra
/**
* Returns a kryo instance to the pool.
*/
fun returnKryo(kryo: KryoExtra)
/**
* @return true if the remote kryo registration are the same as our own
*/
suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean
/**
* @return the details of all registration IDs -> Class name used by kryo
*/
fun getKryoRegistrationDetails(): ByteArray
/**
* Creates a NEW object implementation based on the KRYO interface ID.
*
* @return the corresponding implementation object
*/
fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any
/**
* Returns the Kryo class registration ID
*/
fun getClassId(iFace: Class<*>): Int
/**
* Returns the Kryo class from a registration ID
*/
fun getClassFromId(interfaceClassId: Int): Class<*>
/**
* Gets the RMI implementation based on the specified interface
*
* @return the corresponding implementation
*/
fun <T> getRmiImpl(iFace: Class<T>): Class<T>
/**
* There is additional overhead to using RMI.
*
* - This is for the side where the object lives
*
* This enables a us, the "server" to send objects to a "remote endpoint"
*
* This is NOT bi-directional, and this endpoint cannot access or create remote objects on the "remote client".
*
* @throws IllegalArgumentException if the iface/impl have previously been overridden
*/
fun <Iface, Impl : Iface> registerRmi(ifaceClass: Class<Iface>, implClass: Class<Impl>): NetworkSerializationManager
/**
* Gets the cached methods for the specified class ID
*/
fun getMethods(classId: Int): Array<CachedMethod>
/**
* Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration.
*/
suspend fun finishInit(endPointClass: Class<*>)
/**
* @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client
* and server, even if in the same JVM. This only attempts to register once.
*/
fun initialized(): Boolean
}

View File

@ -36,10 +36,11 @@ import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodRequestSerializer
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.RmiClientRequestSerializer
import dorkbox.network.rmi.messages.RmiObjectSerializer
import dorkbox.network.rmi.messages.RmiClientReverseSerializer
import dorkbox.network.rmi.messages.RmiClientSerializer
import dorkbox.os.OS
import dorkbox.util.serialization.SerializationDefaults
import dorkbox.util.serialization.SerializationManager
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import mu.KLogger
@ -75,7 +76,7 @@ import kotlin.coroutines.Continuation
* Kryo#newDefaultSerializer(Class)
*/
class Serialization(private val references: Boolean,
private val factory: SerializerFactory<*>?) : NetworkSerializationManager {
private val factory: SerializerFactory<*>?) : SerializationManager<DirectBuffer> {
companion object {
@ -118,6 +119,7 @@ class Serialization(private val references: Boolean,
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
// Object checking is performed during actual registration.
private val classesToRegister = mutableListOf<ClassRegistration>()
private lateinit var savedKryoIdsForRmi: IntArray
private lateinit var savedRegistrationDetails: ByteArray
/// RMI things
@ -132,9 +134,11 @@ class Serialization(private val references: Boolean,
private val methodRequestSerializer = MethodRequestSerializer()
private val methodResponseSerializer = MethodResponseSerializer()
private val objectRequestSerializer = RmiClientRequestSerializer()
private val objectResponseSerializer = RmiObjectSerializer(rmiImplToIface)
private val continuationRequestSerializer = ContinuationSerializer()
private val continuationSerializer = ContinuationSerializer()
private val rmiClientSerializer = RmiClientSerializer()
internal val rmiClientReverseSerializer = RmiClientReverseSerializer(rmiImplToIface)
@ -145,9 +149,10 @@ class Serialization(private val references: Boolean,
// reflectASM doesn't work on android
private val useAsm = !OS.isAndroid()
private val KRYO_COUNT = 32
init {
// reasonable size of available kryo's before coroutines are suspended during read/write
val KRYO_COUNT = 64
kryoPool = Channel(KRYO_COUNT)
}
@ -173,9 +178,9 @@ class Serialization(private val references: Boolean,
kryo.register(MethodResponse::class.java, methodResponseSerializer)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, objectRequestSerializer)
kryo.register(InvocationHandler::class.java as Class<Any>, rmiClientSerializer)
kryo.register(Continuation::class.java, continuationRequestSerializer)
kryo.register(Continuation::class.java, continuationSerializer)
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
classesToRegister.forEach { registration ->
@ -203,11 +208,11 @@ class Serialization(private val references: Boolean,
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>): NetworkSerializationManager {
override fun <T> register(clazz: Class<T>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class) call.")
} else {
classesToRegister.add(ClassRegistration(clazz))
classesToRegister.add(ClassRegistration3(clazz))
}
return this
@ -226,7 +231,7 @@ class Serialization(private val references: Boolean,
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, id: Int): NetworkSerializationManager {
override fun <T> register(clazz: Class<T>, id: Int): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, int) call.")
return this
@ -252,7 +257,7 @@ class Serialization(private val references: Boolean,
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): NetworkSerializationManager {
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer) call.")
return this
@ -280,7 +285,7 @@ class Serialization(private val references: Boolean,
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): NetworkSerializationManager {
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer, int) call.")
return this
@ -304,7 +309,7 @@ class Serialization(private val references: Boolean,
* @throws IllegalArgumentException if the iface/impl have previously been overridden
*/
@Synchronized
override fun <Iface, Impl : Iface> registerRmi(ifaceClass: Class<Iface>, implClass: Class<Impl>): NetworkSerializationManager {
fun <Iface, Impl : Iface> registerRmi(ifaceClass: Class<Iface>, implClass: Class<Impl>): Serialization {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate registerRmiImplementation(Class, Class) call.")
return this
@ -313,7 +318,7 @@ class Serialization(private val references: Boolean,
require(ifaceClass.isInterface) { "Cannot register an implementation for RMI access. It must be an interface." }
require(!implClass.isInterface) { "Cannot register an interface for RMI implementations. It must be an implementation." }
classesToRegister.add(ClassRegistrationIfaceAndImpl(ifaceClass, implClass, objectResponseSerializer))
classesToRegister.add(ClassRegistrationIfaceAndImpl(ifaceClass, implClass, rmiClientReverseSerializer))
// rmiIfaceToImpl tells us, "the server" how to create a (requested) remote object
// this MUST BE UNIQUE otherwise unexpected and BAD things can happen.
@ -332,14 +337,14 @@ class Serialization(private val references: Boolean,
* is already in use by a different type, an exception is thrown.
*/
@Synchronized
override suspend fun finishInit(endPointClass: Class<*>) {
fun finishInit(endPointClass: Class<*>) {
this.logger = KotlinLogging.logger(endPointClass.simpleName)
initialized = true
// initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done
// correctly and (if not) we are are notified on the initial thread (instead of on the network update thread)
val kryo = takeKryo()
val kryo = initKryo()
// save off the class-resolver, so we can lookup the class <-> id relationships
classResolver = kryo.classResolver
@ -349,7 +354,7 @@ class Serialization(private val references: Boolean,
// now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations based on ID
// in order to get the ID's, these have to be registered with a kryo instance!
val mergedRegistrations = mutableListOf<ClassRegistration>()
classesToRegister.forEach { registration ->
classesToRegister.forEach { registration ->
val id = registration.id
// if we ALREADY contain this registration (based ONLY on ID), then overwrite the existing one and REMOVE the current one
@ -389,32 +394,47 @@ class Serialization(private val references: Boolean,
}
}
val kryoIdsForRmi = mutableListOf<Int>()
classesToRegister.forEach { classRegistration ->
// now save all of the registration IDs for quick verification/access
registrationDetails.add(classRegistration.getInfoArray())
// we should cache RMI methods! We don't always know if something is RMI or not (from just how things are registered...)
// so it is super trivial to map out all possible, relevant types
val kryoId = classRegistration.id
if (classRegistration is ClassRegistrationIfaceAndImpl) {
// on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation!
methodCache[classRegistration.id] =
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.clazz, classRegistration.implClass, classRegistration.id)
// RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.ifaceClass, classRegistration.implClass, kryoId)
// we ALSO have to cache the instantiator for these, since these are used to create remote objects
val instantiator = kryo.instantiatorStrategy.newInstantiatorOf(classRegistration.implClass)
@Suppress("UNCHECKED_CAST")
rmiIfaceToInstantiator[classRegistration.id] = instantiator as ObjectInstantiator<Any>
rmiIfaceToInstantiator[kryoId] = instantiator as ObjectInstantiator<Any>
// finally, we must save this ID, to tell the remote connection that their interface serializer must change to support
// receiving an RMI impl object as a proxy object
kryoIdsForRmi.add(kryoId)
} else if (classRegistration.clazz.isInterface) {
// on the "RMI client"
methodCache[classRegistration.id] =
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.clazz, null, classRegistration.id)
// non-RMI method caching
methodCache[kryoId] =
RmiUtils.getCachedMethods(logger, kryo, useAsm, classRegistration.clazz, null, kryoId)
}
if (classRegistration.id > 65000) {
if (kryoId > 65000) {
throw RuntimeException("There are too many kryo class registrations!!")
}
}
savedKryoIdsForRmi = kryoIdsForRmi.toIntArray()
// save as an array to make it faster to send this info to the remote connection
// save this as a byte array (so class registration validation during connection handshake is faster)
val output = AeronOutput()
@ -440,10 +460,17 @@ class Serialization(private val references: Boolean,
*
* @return a compressed byte array of the details of all registration IDs -> Class name -> Serialization type used by kryo
*/
override fun getKryoRegistrationDetails(): ByteArray {
fun getKryoRegistrationDetails(): ByteArray {
return savedRegistrationDetails
}
/**
* @return the details of all registration IDs for RMI iface serializer rewrites
*/
fun getKryoRmiIds(): IntArray {
return savedKryoIdsForRmi
}
/**
* NOTE: When this fails, the CLIENT will just time out. We DO NOT want to send an error message to the client
* (it should check for updates or something else). We do not want to give "rogue" clients knowledge of the
@ -451,7 +478,8 @@ class Serialization(private val references: Boolean,
*
* @return true if kryo registration is required for all classes sent over the wire
*/
override suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean {
@Suppress("DuplicatedCode")
fun verifyKryoRegistration(clientBytes: ByteArray): Boolean {
// verify the registration IDs if necessary with our own. The CLIENT does not verify anything, only the server!
val kryoRegistrationDetails = savedRegistrationDetails
val equals = kryoRegistrationDetails.contentEquals(clientBytes)
@ -518,16 +546,26 @@ class Serialization(private val references: Boolean,
val serializerMatches = serializerServer == serializerClient
if (!serializerMatches) {
// JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff
// JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff explicitly
if (serializerServer == objectResponseSerializer::class.java.name && serializerClient.isEmpty()) {
// this is for SERVER RMI!
} else if (serializerClient == objectResponseSerializer::class.java.name && serializerServer.isEmpty()) {
// this is for CLIENT RMI!
} else {
success = false
logger.error("MISMATCH: Registration $idClient Client -> $nameClient ($serializerClient)")
logger.error("MISMATCH: Registration $idServer Server -> $nameServer ($serializerServer)")
when {
serializerServer == rmiClientReverseSerializer::class.java.name -> {
// this is for when the impl is on server, and iface is on client
// after this check, we tell the client that this ID is for RMI
// This necessary because only 1 side registers RMI iface/impl info
}
serializerClient == rmiClientReverseSerializer::class.java.name -> {
// this is for when the impl is on client, and iface is on server
// after this check, we tell MYSELF (the server) that this id is for RMI
// This necessary because only 1 side registers RMI iface/impl info
}
else -> {
success = false
logger.error("MISMATCH: Registration $idClient Client -> $nameClient ($serializerClient)")
logger.error("MISMATCH: Registration $idServer Server -> $nameServer ($serializerServer)")
}
}
}
}
@ -555,21 +593,23 @@ class Serialization(private val references: Boolean,
returnKryo(kryo)
input.close()
}
return false
}
/**
* @return takes a kryo instance from the pool.
* @return takes a kryo instance from the pool, or creates one if the pool was empty
*/
override fun takeKryo(): KryoExtra {
fun takeKryo(): KryoExtra {
// ALWAYS get as many as needed. Recycle them to prevent too many getting created
return kryoPool.poll() ?: initKryo()
}
/**
* Returns a kryo instance to the pool for use later on
* Returns a kryo instance to the pool for re-use later on
*/
override fun returnKryo(kryo: KryoExtra) {
fun returnKryo(kryo: KryoExtra) {
// return as much as we can. don't suspend if the pool is full, we just throw it away.
kryoPool.offer(kryo)
}
@ -577,15 +617,15 @@ class Serialization(private val references: Boolean,
/**
* Returns the Kryo class registration ID
*/
override fun getClassId(iFace: Class<*>): Int {
fun getClassId(iFace: Class<*>): Int {
return classResolver.getRegistration(iFace).id
}
/**
* Returns the Kryo class from a registration ID
*/
override fun getClassFromId(interfaceClassId: Int): Class<*> {
return classResolver.getRegistration(interfaceClassId).type
fun getClassFromId(kryoId: Int): Class<*> {
return classResolver.getRegistration(kryoId).type
}
@ -594,7 +634,7 @@ class Serialization(private val references: Boolean,
*
* @return the corresponding implementation object
*/
override fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any {
fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any {
try {
if (objectParameters == null) {
return rmiIfaceToInstantiator[interfaceClassId].newInstance()
@ -603,7 +643,7 @@ class Serialization(private val references: Boolean,
val size = objectParameters.size
// we have to get the constructor for this object.
val clazz = getRmiImpl(getClassFromId(interfaceClassId))
val clazz = getClassFromId(interfaceClassId)
val constructors = clazz.declaredConstructors
// now have to find the closest match.
@ -637,23 +677,19 @@ class Serialization(private val references: Boolean,
}
}
/**
* Gets the RMI interface based on the specified implementation
*
* @return the corresponding interface
* Gets the cached methods for the specified class ID
*/
@Suppress("UNCHECKED_CAST")
override fun <T> getRmiImpl(iFace: Class<T>): Class<T> {
return rmiIfaceToImpl[iFace] as Class<T>
}
override fun getMethods(classId: Int): Array<CachedMethod> {
fun getMethods(classId: Int): Array<CachedMethod> {
return methodCache[classId]
}
/**
* @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client
* and server, even if in the same JVM. This only attempts to register once.
*/
@Synchronized
override fun initialized(): Boolean {
fun initialized(): Boolean {
return initialized
}

View File

@ -15,7 +15,7 @@
*/
package dorkbox.network.storage
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import dorkbox.util.storage.Storage
import java.security.SecureRandom
@ -23,7 +23,7 @@ import java.security.SecureRandom
class NullSettingsStore : SettingsStore() {
private var serverSalt: ByteArray? = null
override fun init(serializationManager: NetworkSerializationManager, storage: Storage) {}
override fun init(serializationManager: Serialization, storage: Storage) {}
@Throws(SecurityException::class)
override fun getPrivateKey(): ByteArray {

View File

@ -16,7 +16,7 @@
package dorkbox.network.storage
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.bytes.ByteArrayWrapper
import dorkbox.util.storage.Storage
import org.agrona.collections.Int2ObjectHashMap
@ -35,7 +35,7 @@ class PropertyStore : SettingsStore() {
*
* @param serializationManager this is the serialization used for saving objects into the storage database
*/
override fun init(serializationManager: NetworkSerializationManager, storage: Storage) {
override fun init(serializationManager: Serialization, storage: Storage) {
// make sure our custom types are registered
// only register if not ALREADY initialized, since we can initialize in the server and in the client. This creates problems if
// running inside the same JVM (we don't permit it)

View File

@ -15,7 +15,7 @@
*/
package dorkbox.network.storage
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import dorkbox.util.storage.Storage
import org.slf4j.LoggerFactory
@ -30,7 +30,7 @@ abstract class SettingsStore : AutoCloseable {
/**
* Initialize the settingsStore with the provided serialization manager.
*/
abstract fun init(serializationManager: NetworkSerializationManager, storage: Storage)
abstract fun init(serializationManager: Serialization, storage: Storage)
/**
* Simple, property based method for saving the private key of the server

View File

@ -37,7 +37,7 @@ package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import kotlinx.coroutines.runBlocking
import org.junit.Assert
@ -131,7 +131,7 @@ class PingPongTest : BaseTest() {
}
}
private fun register(manager: NetworkSerializationManager) {
private fun register(manager: Serialization) {
manager.register(Data::class.java)
}

View File

@ -20,7 +20,7 @@ import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import dorkboxTest.network.BaseTest
import kotlinx.coroutines.runBlocking
@ -51,7 +51,7 @@ class RmiDelayedInvocationSpamTest : BaseTest() {
// })
}
fun register(serialization: NetworkSerializationManager) {
fun register(serialization: Serialization) {
serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java)
}

View File

@ -19,7 +19,7 @@ import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import dorkboxTest.network.BaseTest
import kotlinx.coroutines.runBlocking
@ -47,7 +47,7 @@ class RmiDelayedInvocationTest : BaseTest() {
// })
}
fun register(serialization: NetworkSerializationManager) {
fun register(serialization: Serialization) {
serialization.registerRmi(TestObject::class.java, TestObjectImpl::class.java)
}

View File

@ -19,7 +19,7 @@ import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkbox.util.exceptions.SecurityException
import dorkboxTest.network.BaseTest
import kotlinx.coroutines.runBlocking
@ -43,7 +43,7 @@ class RmiInitValidationTest : BaseTest() {
// })
}
private fun register(serialization: NetworkSerializationManager) {
private fun register(serialization: Serialization) {
serialization.register(Command1::class.java)
serialization.register(Command2::class.java)
serialization.register(Command3::class.java)

View File

@ -39,7 +39,7 @@ import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.network.serialization.Serialization
import dorkboxTest.network.BaseTest
import dorkboxTest.network.rmi.classes.MessageWithTestCow
import dorkboxTest.network.rmi.classes.TestCow
@ -160,7 +160,7 @@ class RmiTest : BaseTest() {
connection.logger.error("Finished tests")
}
fun register(manager: NetworkSerializationManager) {
fun register(manager: Serialization) {
manager.register(Any::class.java) // Needed for Object#toString, hashCode, etc.
manager.register(TestCow::class.java)
manager.register(MessageWithTestCow::class.java)

View File

@ -0,0 +1,23 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkboxTest.network.rmi.classes
/**
*
*/
interface TestBabyCow : TestCow {
fun drink()
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkboxTest.network.rmi.classes
class TestBabyCowImpl(id: Int) : TestCowImpl(id), TestBabyCow {
override fun drink() {
println("Drinking milk!!")
}
override fun toString(): String {
return "Tada! This is a remote object baby cow!"
}
}

View File

@ -18,7 +18,7 @@ package dorkboxTest.network.rmi.classes
import dorkbox.network.connection.Connection
import kotlinx.coroutines.delay
class TestCowImpl(val id: Int) : TestCowBaseImpl(), TestCow {
open class TestCowImpl(val id: Int) : TestCowBaseImpl(), TestCow {
private var moos = 0

View File

@ -26,8 +26,11 @@ import dorkbox.network.Client
import dorkbox.network.connection.Connection
import dorkboxTest.network.BaseTest
import dorkboxTest.network.rmi.RmiTest
import dorkboxTest.network.rmi.classes.TestBabyCow
import dorkboxTest.network.rmi.classes.TestBabyCowImpl
import dorkboxTest.network.rmi.classes.TestCow
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.slf4j.LoggerFactory
object TestClient {
@ -66,6 +69,7 @@ object TestClient {
val configuration = BaseTest.clientConfig()
RmiTest.register(configuration.serialization)
configuration.serialization.registerRmi(TestBabyCow::class.java, TestBabyCowImpl::class.java)
configuration.serialization.register(TestCow::class.java)
configuration.enableRemoteSignatureValidation = false
@ -75,16 +79,30 @@ object TestClient {
System.err.println("Starting test for: Client -> Server")
connection.createObject<TestCow>(124123) { _, remoteObject ->
RmiTest.runTests(connection, remoteObject, 124123)
System.err.println("DONE")
// RmiTest.runTests(connection, remoteObject, 124123)
// System.err.println("DONE")
// now send this remote object ACROSS the wire to the server (on the server, this is where the IMPLEMENTATION lives)
connection.send(remoteObject)
client.close()
// client.close()
}
}
client.onMessage<TestCow> { connection, test ->
System.err.println("Received test cow from server")
// this object LIVES on the server.
try {
test.moo()
} catch (e: Exception) {
Assert.fail("No exception should be caught.")
}
connection.close()
}
runBlocking {
client.connect(BaseTest.LOOPBACK)
}

View File

@ -20,6 +20,7 @@ import dorkbox.network.connection.Connection
import dorkboxTest.network.BaseTest
import dorkboxTest.network.rmi.RmiTest
import dorkboxTest.network.rmi.classes.MessageWithTestCow
import dorkboxTest.network.rmi.classes.TestBabyCow
import dorkboxTest.network.rmi.classes.TestCow
import dorkboxTest.network.rmi.classes.TestCowImpl
import dorkboxTest.network.rmi.multiJVM.TestClient.setup
@ -37,6 +38,7 @@ object TestServer {
val configuration = BaseTest.serverConfig()
RmiTest.register(configuration.serialization)
configuration.serialization.register(TestBabyCow::class.java)
configuration.serialization.registerRmi(TestCow::class.java, TestCowImpl::class.java)
configuration.enableRemoteSignatureValidation = false
@ -62,15 +64,15 @@ object TestServer {
System.err.println("Received test cow from client")
// this object LIVES on the server.
test.moo()
test.moo("Cow")
Assert.assertEquals(123123, test.id())
try {
test.moo()
Assert.fail("Should catch an exception!")
} catch (e: Exception) {
}
// Test that RMI correctly waits for the remotely invoked method to exit
test.moo("You should see this two seconds before...", 2000)
connection.logger.error("...This")
// now test sending this object BACK to the client. The client SHOULD have the same RMI proxy object as before!
connection.send(test)
//
// System.err.println("Starting test for: Server -> Client")
// connection.createObject<TestCow>(123) { rmiId, remoteObject ->
// System.err.println("Running test for: Server -> Client")