Cleaned up log message. Now supports constructor parameters for creating RMI objects
This commit is contained in:
parent
3513e58d30
commit
a58af6ba00
|
@ -100,14 +100,14 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
|
|||
|
||||
init {
|
||||
// have to do some basic validation of our configuration
|
||||
if (config.publicationPort <= 0) { throw newException("configuration port must be > 0") }
|
||||
if (config.publicationPort >= 65535) { throw newException("configuration port must be < 65535") }
|
||||
if (config.publicationPort <= 0) { throw ClientException("configuration port must be > 0") }
|
||||
if (config.publicationPort >= 65535) { throw ClientException("configuration port must be < 65535") }
|
||||
|
||||
if (config.subscriptionPort <= 0) { throw newException("configuration controlPort must be > 0") }
|
||||
if (config.subscriptionPort >= 65535) { throw newException("configuration controlPort must be < 65535") }
|
||||
if (config.subscriptionPort <= 0) { throw ClientException("configuration controlPort must be > 0") }
|
||||
if (config.subscriptionPort >= 65535) { throw ClientException("configuration controlPort must be < 65535") }
|
||||
|
||||
if (config.networkMtuSize <= 0) { throw newException("configuration networkMtuSize must be > 0") }
|
||||
if (config.networkMtuSize >= 9 * 1024) { throw newException("configuration networkMtuSize must be < ${9 * 1024}") }
|
||||
if (config.networkMtuSize <= 0) { throw ClientException("configuration networkMtuSize must be > 0") }
|
||||
if (config.networkMtuSize >= 9 * 1024) { throw ClientException("configuration networkMtuSize must be < ${9 * 1024}") }
|
||||
|
||||
autoClosableObjects.add(handshake)
|
||||
}
|
||||
|
@ -618,13 +618,43 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
|
|||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
suspend inline fun <reified Iface> createObject(noinline callback: suspend (Iface) -> Unit) {
|
||||
suspend inline fun <reified Iface> createObject(vararg objectParameters: Any?, noinline callback: suspend (Int, Iface) -> Unit) {
|
||||
// NOTE: It's not possible to have reified inside a virtual function
|
||||
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
|
||||
val classId = serialization.getClassId(Iface::class.java)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
objectParameters as Array<Any?>
|
||||
|
||||
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
|
||||
rmiConnectionSupport.createRemoteObject(getConnection(), classId, objectParameters, callback)
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells the remote connection to create a new proxy object that implements the specified interface in the CONNECTION scope.
|
||||
*
|
||||
* The methods on this object "map" to an object that is created remotely.
|
||||
*
|
||||
* The callback will be notified when the remote object has been created.
|
||||
*
|
||||
* Methods that return a value will throw [TimeoutException] if the response is not received with the
|
||||
* response timeout [RemoteObject.responseTimeout].
|
||||
*
|
||||
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
|
||||
* will have the proxy object replaced with the registered (non-proxy) object.
|
||||
*
|
||||
* If one wishes to change the default behavior, cast the object to access the different methods.
|
||||
* ie: `val remoteObject = test as RemoteObject`
|
||||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
suspend inline fun <reified Iface> createObject(noinline callback: suspend (Int, Iface) -> Unit) {
|
||||
// NOTE: It's not possible to have reified inside a virtual function
|
||||
// https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function
|
||||
val classId = serialization.getClassId(Iface::class.java)
|
||||
|
||||
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
|
||||
rmiConnectionSupport.createRemoteObject(getConnection(), classId, callback)
|
||||
rmiConnectionSupport.createRemoteObject(getConnection(), classId, null, callback)
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -110,14 +110,14 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
|||
}
|
||||
|
||||
|
||||
if (config.publicationPort <= 0) { throw newException("configuration port must be > 0") }
|
||||
if (config.publicationPort >= 65535) { throw newException("configuration port must be < 65535") }
|
||||
if (config.publicationPort <= 0) { throw ServerException("configuration port must be > 0") }
|
||||
if (config.publicationPort >= 65535) { throw ServerException("configuration port must be < 65535") }
|
||||
|
||||
if (config.subscriptionPort <= 0) { throw newException("configuration controlPort must be > 0") }
|
||||
if (config.subscriptionPort >= 65535) { throw newException("configuration controlPort must be < 65535") }
|
||||
if (config.subscriptionPort <= 0) { throw ServerException("configuration controlPort must be > 0") }
|
||||
if (config.subscriptionPort >= 65535) { throw ServerException("configuration controlPort must be < 65535") }
|
||||
|
||||
if (config.networkMtuSize <= 0) { throw newException("configuration networkMtuSize must be > 0") }
|
||||
if (config.networkMtuSize >= 9 * 1024) { throw newException("configuration networkMtuSize must be < ${9 * 1024}") }
|
||||
if (config.networkMtuSize <= 0) { throw ServerException("configuration networkMtuSize must be > 0") }
|
||||
if (config.networkMtuSize >= 9 * 1024) { throw ServerException("configuration networkMtuSize must be < ${9 * 1024}") }
|
||||
|
||||
autoClosableObjects.add(handshake)
|
||||
}
|
||||
|
|
|
@ -238,7 +238,9 @@ open class Connection(val endPoint: EndPoint<*>, mediaDriverConnection: MediaDri
|
|||
*/
|
||||
suspend fun send(message: Any) {
|
||||
// The sessionId is globally unique, and is assigned by the server.
|
||||
logger.debug("[{}] send: {}", publication.sessionId(), message)
|
||||
logger.trace {
|
||||
"[${publication.sessionId()}] send: $message"
|
||||
}
|
||||
|
||||
val kryo: KryoExtra = serialization.takeKryo()
|
||||
try {
|
||||
|
@ -780,10 +782,37 @@ open class Connection(val endPoint: EndPoint<*>, mediaDriverConnection: MediaDri
|
|||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
suspend fun <Iface> createObject(callback: suspend (Iface) -> Unit) {
|
||||
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 0)
|
||||
suspend fun <Iface> createObject(vararg objectParameters: Any?, callback: suspend (Int, Iface) -> Unit) {
|
||||
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 1)
|
||||
val interfaceClassId = endPoint.serialization.getClassId(iFaceClass)
|
||||
|
||||
rmiConnectionSupport.createRemoteObject(this, interfaceClassId, callback)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
objectParameters as Array<Any?>
|
||||
|
||||
rmiConnectionSupport.createRemoteObject(this, interfaceClassId, objectParameters, callback)
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map"
|
||||
* to an object that is created remotely.
|
||||
*
|
||||
* The callback will be notified when the remote object has been created.
|
||||
*
|
||||
* Methods that return a value will throw [TimeoutException] if the response is not received with the
|
||||
* response timeout [RemoteObject.responseTimeout].
|
||||
*
|
||||
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
|
||||
* will have the proxy object replaced with the registered (non-proxy) object.
|
||||
*
|
||||
* If one wishes to change the default behavior, cast the object to access the different methods.
|
||||
* ie: `val remoteObject = test as RemoteObject`
|
||||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
suspend fun <Iface> createObject(callback: suspend (Int, Iface) -> Unit) {
|
||||
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 1)
|
||||
val interfaceClassId = endPoint.serialization.getClassId(iFaceClass)
|
||||
|
||||
rmiConnectionSupport.createRemoteObject(this, interfaceClassId, null, callback)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -444,7 +444,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
|
||||
internal suspend fun writeHandshakeMessage(publication: Publication, message: Any) {
|
||||
// The sessionId is globally unique, and is assigned by the server.
|
||||
logger.debug("[{}] send: {}", publication.sessionId(), message)
|
||||
logger.trace {
|
||||
"[${publication.sessionId()}] send: $message"
|
||||
}
|
||||
|
||||
val kryo: KryoExtra = serialization.takeKryo()
|
||||
try {
|
||||
|
@ -491,7 +493,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
val kryo: KryoExtra = serialization.takeKryo()
|
||||
try {
|
||||
val message = kryo.read(buffer, offset, length)
|
||||
logger.debug("[{}] received: {}", header.sessionId(), message)
|
||||
logger.trace {
|
||||
"[${header.sessionId()}] received: $message"
|
||||
}
|
||||
|
||||
return message
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error de-serializing message on connection ${header.sessionId()}!", e)
|
||||
|
@ -523,7 +528,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
val kryo: KryoExtra = serialization.takeKryo()
|
||||
try {
|
||||
message = kryo.read(buffer, offset, length, connection)
|
||||
logger.debug("[{}] received: {}", sessionId, message)
|
||||
logger.trace {
|
||||
"[${sessionId}] received: ${message}"
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
listenerManager.notifyError(newException("[${sessionId}] Error de-serializing message", e))
|
||||
} finally {
|
||||
|
|
|
@ -191,7 +191,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
|
|||
* + class and object bytes +
|
||||
* ++++++++++++++++++++++++++
|
||||
*/
|
||||
private fun read(reader: Input): Any {
|
||||
fun read(reader: Input): Any {
|
||||
return readClassAndObject(reader)
|
||||
}
|
||||
|
||||
|
|
|
@ -175,7 +175,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
|||
*
|
||||
* This method should not block for long periods as other network activity will not be processed until it returns.
|
||||
*/
|
||||
suspend fun <M : Any> onMessage(function: suspend (CONNECTION, M) -> Unit) {
|
||||
suspend fun <MESSAGE : Any> onMessage(function: suspend (CONNECTION, MESSAGE) -> Unit) {
|
||||
onMessageMutex.withLock {
|
||||
// we have to follow the single-writer principle!
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ internal class RmiSupport(logger: KLogger,
|
|||
* called on "client"
|
||||
*/
|
||||
private fun onGenericObjectResponse(endPoint: EndPoint<*>, connection: Connection, logger: KLogger,
|
||||
isGlobal: Boolean, rmiId: Int, callback: suspend (Any) -> Unit,
|
||||
isGlobal: Boolean, rmiId: Int, callback: suspend (Int, Any) -> Unit,
|
||||
rmiSupportCache: RmiSupportCache, serialization: NetworkSerializationManager) {
|
||||
|
||||
// we only create the proxy + execute the callback if the RMI id is valid!
|
||||
|
@ -138,7 +138,7 @@ internal class RmiSupport(logger: KLogger,
|
|||
return
|
||||
}
|
||||
|
||||
val interfaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback::class.java, callback.javaClass, 0)
|
||||
val interfaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback::class.java, callback.javaClass, 1)
|
||||
|
||||
// create the client-side proxy object, if possible
|
||||
var proxyObject = rmiSupportCache.getProxyObject(rmiId)
|
||||
|
@ -150,7 +150,7 @@ internal class RmiSupport(logger: KLogger,
|
|||
// this should be executed on a NEW coroutine!
|
||||
endPoint.actionDispatch.launch {
|
||||
try {
|
||||
callback(proxyObject)
|
||||
callback(rmiId, proxyObject)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error getting or creating the remote object $interfaceClass", e)
|
||||
}
|
||||
|
@ -164,11 +164,11 @@ internal class RmiSupport(logger: KLogger,
|
|||
|
||||
|
||||
|
||||
internal fun <Iface> registerCallback(callback: suspend (Iface) -> Unit): Int {
|
||||
internal fun <Iface> registerCallback(callback: suspend (Int, Iface) -> Unit): Int {
|
||||
return remoteObjectCreationCallbacks.register(callback)
|
||||
}
|
||||
|
||||
private fun removeCallback(callbackId: Int): suspend (Any) -> Unit {
|
||||
private fun removeCallback(callbackId: Int): suspend (Int, Any) -> Unit {
|
||||
// callback's area always correct, because we track them ourselves.
|
||||
return remoteObjectCreationCallbacks.remove(callbackId)!!
|
||||
}
|
||||
|
@ -382,12 +382,25 @@ internal class RmiSupport(logger: KLogger,
|
|||
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) {
|
||||
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
|
||||
val callbackId = RmiUtils.unpackRight(message.packedIds)
|
||||
val objectParameters = message.objectParameters
|
||||
|
||||
|
||||
// We have to lookup the iface, since the proxy object requires it
|
||||
val implObject = endPoint.serialization.createRmiObject(interfaceClassId)
|
||||
val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters)
|
||||
|
||||
val response = if (implObject is Exception) {
|
||||
// whoops!
|
||||
logger.error("Unable to create remote object!", implObject)
|
||||
|
||||
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
|
||||
GlobalObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId))
|
||||
} else {
|
||||
val rmiId = saveImplObject(logger, implObject)
|
||||
|
||||
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
|
||||
connection.send(GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId)))
|
||||
GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))
|
||||
}
|
||||
|
||||
connection.send(response)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,11 +42,11 @@ internal class RmiSupportConnection(logger: KLogger,
|
|||
/**
|
||||
* on the "client" to create a connection-specific remote object (that exists on the server)
|
||||
*/
|
||||
suspend fun <Iface> createRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) {
|
||||
suspend fun <Iface> createRemoteObject(connection: Connection, interfaceClassId: Int, objectParameters: Array<Any?>?, callback: suspend (Int, Iface) -> Unit) {
|
||||
val callbackId = rmiGlobalSupport.registerCallback(callback)
|
||||
|
||||
// There is no rmiID yet, because we haven't created it!
|
||||
val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId))
|
||||
val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId), objectParameters)
|
||||
|
||||
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
|
@ -62,9 +62,18 @@ internal class RmiSupportConnection(logger: KLogger,
|
|||
|
||||
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
|
||||
val callbackId = RmiUtils.unpackRight(message.packedIds)
|
||||
val objectParameters = message.objectParameters
|
||||
|
||||
// We have to lookup the iface, since the proxy object requires it
|
||||
val implObject = endPoint.serialization.createRmiObject(interfaceClassId)
|
||||
val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters)
|
||||
|
||||
val response = if (implObject is Exception) {
|
||||
// whoops!
|
||||
logger.error("Unable to create remote object!", implObject)
|
||||
|
||||
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
|
||||
ConnectionObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId))
|
||||
} else {
|
||||
val rmiId = saveImplObject(implObject)
|
||||
|
||||
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
|
||||
|
@ -81,6 +90,9 @@ internal class RmiSupportConnection(logger: KLogger,
|
|||
}
|
||||
|
||||
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
|
||||
connection.send(ConnectionObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId)))
|
||||
ConnectionObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))
|
||||
}
|
||||
|
||||
connection.send(response)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,5 +20,21 @@ package dorkbox.network.rmi.messages
|
|||
*
|
||||
* @param interfaceClassId (LEFT) the Kryo interface class ID to create
|
||||
* @param callbackId (RIGHT) to know which callback to use when the object is created
|
||||
* @param objectParameters the constructor parameters to create the object with
|
||||
*/
|
||||
data class ConnectionObjectCreateRequest(val packedIds: Int) : RmiMessage
|
||||
data class ConnectionObjectCreateRequest(val packedIds: Int, val objectParameters: Array<Any?>?) : RmiMessage {
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (javaClass != other?.javaClass) return false
|
||||
|
||||
other as ConnectionObjectCreateRequest
|
||||
|
||||
if (packedIds != other.packedIds) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
return packedIds
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,6 @@ package dorkbox.network.rmi.messages
|
|||
*
|
||||
* @param interfaceClassId (LEFT) the Kryo interface class ID to create
|
||||
* @param callbackId (RIGHT) to know which callback to use when the object is created
|
||||
* @param objectParameters the constructor parameters to create the object with
|
||||
*/
|
||||
data class GlobalObjectCreateRequest(val packedIds: Int) : RmiMessage
|
||||
//a asd
|
||||
data class GlobalObjectCreateRequest(val packedIds: Int, val objectParameters: Array<Any?>?) : RmiMessage
|
||||
|
|
|
@ -49,6 +49,7 @@ import dorkbox.network.connection.KryoExtra
|
|||
*/
|
||||
class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap<Class<*>, Class<*>>) : Serializer<Any>(false) {
|
||||
override fun write(kryo: Kryo, output: Output, `object`: Any) {
|
||||
println(" FIX ObjectResponseSerializer ")
|
||||
val kryoExtra = kryo as KryoExtra
|
||||
// val id = kryoExtra.rmiSupport.getRegisteredId(`object`) //
|
||||
// output.writeInt(id, true)
|
||||
|
@ -56,6 +57,7 @@ class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap<Class<*>,
|
|||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, implementationType: Class<*>): Any? {
|
||||
println(" FIX ObjectResponseSerializer ")
|
||||
val kryoExtra = kryo as KryoExtra
|
||||
val objectID = input.readInt(true)
|
||||
|
||||
|
@ -66,3 +68,5 @@ class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap<Class<*>,
|
|||
return null
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: FIX THIS CLASS MAYBE!
|
||||
|
|
|
@ -19,59 +19,9 @@ import com.esotericsoftware.kryo.Serializer
|
|||
import dorkbox.network.connection.KryoExtra
|
||||
import dorkbox.network.rmi.CachedMethod
|
||||
import dorkbox.util.serialization.SerializationManager
|
||||
import org.agrona.DirectBuffer
|
||||
|
||||
interface NetworkSerializationManager : SerializationManager {
|
||||
// /**
|
||||
// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
|
||||
// *
|
||||
// *
|
||||
// * There is a small speed penalty if there were no kryo's available to use.
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun write(connection: Connection_, message: Any)
|
||||
|
||||
// /**
|
||||
// * Reads an object from the buffer.
|
||||
// *
|
||||
// * @param length should ALWAYS be the length of the expected object!
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun read(connection: Connection_, length: Int): Any
|
||||
//
|
||||
// /**
|
||||
// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
|
||||
// *
|
||||
// *
|
||||
// * There is a small speed penalty if there were no kryo's available to use.
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun writeWithCompression(connection: Connection_, message: Any)
|
||||
//
|
||||
// /**
|
||||
// * Reads an object from the buffer.
|
||||
// *
|
||||
// * @param length should ALWAYS be the length of the expected object!
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun readWithCompression(connection: Connection_, length: Int): Any
|
||||
|
||||
// /**
|
||||
// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
|
||||
// *
|
||||
// *
|
||||
// * There is a small speed penalty if there were no kryo's available to use.
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun writeWithCrypto(connection: Connection_, message: Any)
|
||||
//
|
||||
// /**
|
||||
// * Reads an object from the buffer.
|
||||
// *
|
||||
// * @param length should ALWAYS be the length of the expected object!
|
||||
// */
|
||||
// @Throws(IOException::class)
|
||||
// fun readWithCrypto(connection: Connection_, length: Int): Any
|
||||
|
||||
interface NetworkSerializationManager : SerializationManager<DirectBuffer> {
|
||||
/**
|
||||
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
|
||||
* If the class is already registered, the existing entry is updated with the new serializer.
|
||||
|
@ -153,7 +103,7 @@ interface NetworkSerializationManager : SerializationManager {
|
|||
*
|
||||
* @return the corresponding implementation object
|
||||
*/
|
||||
fun createRmiObject(interfaceClassId: Int): Any
|
||||
fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any
|
||||
|
||||
/**
|
||||
* Returns the Kryo class registration ID
|
||||
|
|
|
@ -26,6 +26,8 @@ import com.esotericsoftware.kryo.util.IdentityMap
|
|||
import dorkbox.network.connection.KryoExtra
|
||||
import dorkbox.network.connection.ping.PingMessage
|
||||
import dorkbox.network.handshake.Message
|
||||
import dorkbox.network.pipeline.AeronInput
|
||||
import dorkbox.network.pipeline.AeronOutput
|
||||
import dorkbox.network.rmi.CachedMethod
|
||||
import dorkbox.network.rmi.RmiUtils
|
||||
import dorkbox.network.rmi.messages.*
|
||||
|
@ -33,14 +35,16 @@ import dorkbox.objectPool.ObjectPool
|
|||
import dorkbox.objectPool.PoolableObject
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.serialization.SerializationDefaults
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.Unpooled
|
||||
import org.agrona.DirectBuffer
|
||||
import org.agrona.MutableDirectBuffer
|
||||
import org.agrona.collections.Int2ObjectHashMap
|
||||
import org.objenesis.instantiator.ObjectInstantiator
|
||||
import org.objenesis.strategy.StdInstantiatorStrategy
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.lang.reflect.Constructor
|
||||
import java.lang.reflect.InvocationHandler
|
||||
|
||||
/**
|
||||
|
@ -549,10 +553,49 @@ class Serialization(references: Boolean,
|
|||
*
|
||||
* @return the corresponding implementation object
|
||||
*/
|
||||
override fun createRmiObject(interfaceClassId: Int): Any {
|
||||
override fun createRmiObject(interfaceClassId: Int, objectParameters: Array<Any?>?): Any {
|
||||
try {
|
||||
if (objectParameters == null) {
|
||||
return rmiIfaceToInstantiator[interfaceClassId].newInstance()
|
||||
}
|
||||
|
||||
val size = objectParameters.size
|
||||
|
||||
// we have to get the constructor for this object.
|
||||
val clazz = getRmiImpl(getClassFromId(interfaceClassId))
|
||||
val constructors = clazz.declaredConstructors
|
||||
|
||||
// now have to find the closest match.
|
||||
val matchedBySize = constructors.filter { it.parameterCount == size }
|
||||
|
||||
if (matchedBySize.size == 1) {
|
||||
// this is our only option
|
||||
return matchedBySize[0].newInstance(*objectParameters)
|
||||
}
|
||||
|
||||
// have to match by type
|
||||
val matchedByType = mutableListOf<Pair<Int, Constructor<*>>>()
|
||||
objectParameters.forEachIndexed { index, any ->
|
||||
if (any != null) {
|
||||
matchedBySize.forEach { singleConstructor ->
|
||||
var matchCount = 0
|
||||
if (singleConstructor.parameterTypes[index] == any::class.java) {
|
||||
matchCount++
|
||||
}
|
||||
|
||||
matchedByType.add(Pair(matchCount, singleConstructor))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// find the constructor with the highest match
|
||||
matchedByType.sortByDescending { it.first }
|
||||
return matchedByType[0].second.newInstance(*objectParameters)
|
||||
} catch(e: Exception) {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets the RMI interface based on the specified implementation
|
||||
|
@ -583,14 +626,14 @@ class Serialization(references: Boolean,
|
|||
* There is a small speed penalty if there were no kryo's available to use.
|
||||
*/
|
||||
@Throws(IOException::class)
|
||||
override fun write(buffer: ByteBuf, message: Any) {
|
||||
// val kryo = kryoPool.take()
|
||||
// try {
|
||||
// kryo.writeClassAndObject(buffer, message)
|
||||
// kryo.write(NOP_CONNECTION, message)
|
||||
// } finally {
|
||||
// kryoPool.put(kryo)
|
||||
// }
|
||||
override fun write(buffer: DirectBuffer, message: Any) {
|
||||
val kryo = kryoPool.take()
|
||||
try {
|
||||
val output = AeronOutput(buffer as MutableDirectBuffer)
|
||||
kryo.writeClassAndObject(output, message)
|
||||
} finally {
|
||||
kryoPool.put(kryo)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -602,22 +645,14 @@ class Serialization(references: Boolean,
|
|||
* @param length should ALWAYS be the length of the expected object!
|
||||
*/
|
||||
@Throws(IOException::class)
|
||||
override fun read(buffer: ByteBuf, length: Int): Any? {
|
||||
// val kryo = kryoPool.take()
|
||||
// return try {
|
||||
// if (wireReadLogger.isTraceEnabled) {
|
||||
// val start = buffer.readerIndex()
|
||||
// val `object` = kryo.read(buffer)
|
||||
// val end = buffer.readerIndex()
|
||||
// wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start))
|
||||
// `object`
|
||||
// } else {
|
||||
// kryo.read(NOP_CONNECTION, buffer)
|
||||
// }
|
||||
// } finally {
|
||||
// kryoPool.put(kryo)
|
||||
// }
|
||||
return null
|
||||
override fun read(buffer: DirectBuffer, length: Int): Any? {
|
||||
val kryo = kryoPool.take()
|
||||
return try {
|
||||
val input = AeronInput(buffer)
|
||||
kryo.readClassAndObject(input)
|
||||
} finally {
|
||||
kryoPool.put(kryo)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,8 +22,8 @@ package dorkboxTest.network
|
|||
import dorkbox.network.Client
|
||||
import dorkbox.network.Server
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.serialization.NetworkSerializationManager
|
||||
import dorkbox.util.exceptions.SecurityException
|
||||
import dorkbox.util.serialization.SerializationManager
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
|
@ -104,7 +104,7 @@ class PingPongTest : BaseTest() {
|
|||
}
|
||||
}
|
||||
|
||||
private fun register(manager: SerializationManager) {
|
||||
private fun register(manager: NetworkSerializationManager) {
|
||||
manager.register(Data::class.java)
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ class RmiTest : BaseTest() {
|
|||
Assert.assertFalse(s == remoteObject.toString())
|
||||
test.moo()
|
||||
test.moo("Cow")
|
||||
Assert.assertEquals(remoteObjectID.toLong(), test.id().toLong())
|
||||
Assert.assertEquals(remoteObjectID, test.id())
|
||||
|
||||
// Test that RMI correctly waits for the remotely invoked method to exit
|
||||
remoteObject.responseTimeout = 5000
|
||||
|
@ -135,6 +135,7 @@ class RmiTest : BaseTest() {
|
|||
m.number = 678
|
||||
m.text = "sometext"
|
||||
connection.send(m)
|
||||
|
||||
println("Finished tests")
|
||||
}
|
||||
|
||||
|
@ -150,20 +151,12 @@ class RmiTest : BaseTest() {
|
|||
@Throws(SecurityException::class, IOException::class, InterruptedException::class)
|
||||
fun rmiNetworkGlobal() {
|
||||
rmiGlobal()
|
||||
|
||||
// have to reset the object ID counter
|
||||
TestCowImpl.ID_COUNTER.set(1)
|
||||
Thread.sleep(2000L)
|
||||
}
|
||||
|
||||
@Test
|
||||
@Throws(SecurityException::class, IOException::class, InterruptedException::class)
|
||||
fun rmiNetworkConnection() {
|
||||
rmi()
|
||||
|
||||
// have to reset the object ID counter
|
||||
TestCowImpl.ID_COUNTER.set(1)
|
||||
Thread.sleep(2000L)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -174,10 +167,6 @@ class RmiTest : BaseTest() {
|
|||
configuration.listenIpAddress = LOOPBACK
|
||||
}
|
||||
}
|
||||
|
||||
// have to reset the object ID counter
|
||||
TestCowImpl.ID_COUNTER.set(1)
|
||||
Thread.sleep(2000L)
|
||||
}
|
||||
|
||||
@Throws(SecurityException::class, IOException::class)
|
||||
|
@ -199,14 +188,14 @@ class RmiTest : BaseTest() {
|
|||
System.err.println("Received finish signal for test for: Client -> Server")
|
||||
val `object` = m.testCow
|
||||
val id = `object`.id()
|
||||
Assert.assertEquals(1, id.toLong())
|
||||
Assert.assertEquals(23, id.toLong())
|
||||
System.err.println("Finished test for: Client -> Server")
|
||||
System.err.println("Starting test for: Server -> Client")
|
||||
|
||||
// normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug
|
||||
connection.createObject<TestCow> { remoteObject ->
|
||||
|
||||
System.err.println("Starting test for: Server -> Client")
|
||||
connection.createObject<TestCow>(123) { rmiId, remoteObject ->
|
||||
System.err.println("Running test for: Server -> Client")
|
||||
runTests(connection, remoteObject, 2)
|
||||
runTests(connection, remoteObject, 123)
|
||||
System.err.println("Done with test for: Server -> Client")
|
||||
}
|
||||
}
|
||||
|
@ -224,9 +213,9 @@ class RmiTest : BaseTest() {
|
|||
addEndPoint(client)
|
||||
|
||||
client.onConnect { connection ->
|
||||
connection.createObject<TestCow> { remoteObject ->
|
||||
connection.createObject<TestCow>(23) { rmiId, remoteObject ->
|
||||
System.err.println("Running test for: Client -> Server")
|
||||
runTests(connection, remoteObject, 1)
|
||||
runTests(connection, remoteObject, 23)
|
||||
System.err.println("Done with test for: Client -> Server")
|
||||
}
|
||||
}
|
||||
|
@ -235,7 +224,7 @@ class RmiTest : BaseTest() {
|
|||
System.err.println("Received finish signal for test for: Client -> Server")
|
||||
val `object` = m.testCow
|
||||
val id = `object`.id()
|
||||
Assert.assertEquals(2, id.toLong())
|
||||
Assert.assertEquals(123, id.toLong())
|
||||
System.err.println("Finished test for: Client -> Server")
|
||||
stopEndPoints(2000)
|
||||
}
|
||||
|
@ -269,14 +258,14 @@ class RmiTest : BaseTest() {
|
|||
val `object` = m.testCow
|
||||
val id = `object`.id()
|
||||
|
||||
Assert.assertEquals(1, id.toLong())
|
||||
Assert.assertEquals(44, id.toLong())
|
||||
|
||||
System.err.println("Finished test for: Client -> Server")
|
||||
|
||||
// normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug
|
||||
connection.createObject<TestCow> { remoteObject ->
|
||||
connection.createObject<TestCow>(4) { rmiId, remoteObject ->
|
||||
System.err.println("Running test for: Server -> Client")
|
||||
runTests(connection, remoteObject, 2)
|
||||
runTests(connection, remoteObject, 4)
|
||||
System.err.println("Done with test for: Server -> Client")
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +285,7 @@ class RmiTest : BaseTest() {
|
|||
System.err.println("Received finish signal for test for: Client -> Server")
|
||||
val `object` = m.testCow
|
||||
val id = `object`.id()
|
||||
Assert.assertEquals(2, id.toLong())
|
||||
Assert.assertEquals(4, id.toLong())
|
||||
System.err.println("Finished test for: Client -> Server")
|
||||
stopEndPoints(2000)
|
||||
}
|
||||
|
@ -307,9 +296,9 @@ class RmiTest : BaseTest() {
|
|||
System.err.println("Starting test for: Client -> Server")
|
||||
|
||||
// this creates a GLOBAL object on the server (instead of a connection specific object)
|
||||
client.createObject<TestCow> { remoteObject ->
|
||||
client.createObject<TestCow>(44) { rmiId, remoteObject ->
|
||||
System.err.println("Running test for: Client -> Server")
|
||||
runTests(client.getConnection(), remoteObject, 1)
|
||||
runTests(client.getConnection(), remoteObject, 44)
|
||||
System.err.println("Done with test for: Client -> Server")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ package dorkboxTest.network.rmi.classes
|
|||
|
||||
open class TestCowBaseImpl : TestCowBase {
|
||||
override fun throwException() {
|
||||
System.err.println("The following exception is EXPECTED, but should only be on one log!")
|
||||
throw UnsupportedOperationException("Why would I do that?")
|
||||
}
|
||||
|
||||
|
|
|
@ -14,18 +14,10 @@
|
|||
*/
|
||||
package dorkboxTest.network.rmi.classes
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class TestCowImpl : TestCowBaseImpl(),
|
||||
TestCow {
|
||||
|
||||
companion object {
|
||||
// has to start at 1
|
||||
val ID_COUNTER = AtomicInteger(1)
|
||||
}
|
||||
class TestCowImpl(val id: Int) : TestCowBaseImpl(), TestCow {
|
||||
|
||||
private var moos = 0
|
||||
private val id = ID_COUNTER.getAndIncrement()
|
||||
|
||||
override fun moo() {
|
||||
moos++
|
||||
println("Moo! $moos")
|
||||
|
|
Loading…
Reference in New Issue
Block a user