diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index ca23622a..1240e1ed 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -100,14 +100,14 @@ open class Client(config: Configuration = Configuration init { // have to do some basic validation of our configuration - if (config.publicationPort <= 0) { throw newException("configuration port must be > 0") } - if (config.publicationPort >= 65535) { throw newException("configuration port must be < 65535") } + if (config.publicationPort <= 0) { throw ClientException("configuration port must be > 0") } + if (config.publicationPort >= 65535) { throw ClientException("configuration port must be < 65535") } - if (config.subscriptionPort <= 0) { throw newException("configuration controlPort must be > 0") } - if (config.subscriptionPort >= 65535) { throw newException("configuration controlPort must be < 65535") } + if (config.subscriptionPort <= 0) { throw ClientException("configuration controlPort must be > 0") } + if (config.subscriptionPort >= 65535) { throw ClientException("configuration controlPort must be < 65535") } - if (config.networkMtuSize <= 0) { throw newException("configuration networkMtuSize must be > 0") } - if (config.networkMtuSize >= 9 * 1024) { throw newException("configuration networkMtuSize must be < ${9 * 1024}") } + if (config.networkMtuSize <= 0) { throw ClientException("configuration networkMtuSize must be > 0") } + if (config.networkMtuSize >= 9 * 1024) { throw ClientException("configuration networkMtuSize must be < ${9 * 1024}") } autoClosableObjects.add(handshake) } @@ -618,13 +618,43 @@ open class Client(config: Configuration = Configuration * * @see RemoteObject */ - suspend inline fun createObject(noinline callback: suspend (Iface) -> Unit) { + suspend inline fun createObject(vararg objectParameters: Any?, noinline callback: suspend (Int, Iface) -> Unit) { + // NOTE: It's not possible to have reified inside a virtual function + // https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function + val classId = serialization.getClassId(Iface::class.java) + + @Suppress("UNCHECKED_CAST") + objectParameters as Array + + @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE") + rmiConnectionSupport.createRemoteObject(getConnection(), classId, objectParameters, callback) + } + + /** + * Tells the remote connection to create a new proxy object that implements the specified interface in the CONNECTION scope. + * + * The methods on this object "map" to an object that is created remotely. + * + * The callback will be notified when the remote object has been created. + * + * Methods that return a value will throw [TimeoutException] if the response is not received with the + * response timeout [RemoteObject.responseTimeout]. + * + * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + * + * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `val remoteObject = test as RemoteObject` + * + * @see RemoteObject + */ + suspend inline fun createObject(noinline callback: suspend (Int, Iface) -> Unit) { // NOTE: It's not possible to have reified inside a virtual function // https://stackoverflow.com/questions/60037849/kotlin-reified-generic-in-virtual-function val classId = serialization.getClassId(Iface::class.java) @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE") - rmiConnectionSupport.createRemoteObject(getConnection(), classId, callback) + rmiConnectionSupport.createRemoteObject(getConnection(), classId, null, callback) } // diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 94dda419..ec7c1823 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -110,14 +110,14 @@ open class Server(config: ServerConfiguration = ServerC } - if (config.publicationPort <= 0) { throw newException("configuration port must be > 0") } - if (config.publicationPort >= 65535) { throw newException("configuration port must be < 65535") } + if (config.publicationPort <= 0) { throw ServerException("configuration port must be > 0") } + if (config.publicationPort >= 65535) { throw ServerException("configuration port must be < 65535") } - if (config.subscriptionPort <= 0) { throw newException("configuration controlPort must be > 0") } - if (config.subscriptionPort >= 65535) { throw newException("configuration controlPort must be < 65535") } + if (config.subscriptionPort <= 0) { throw ServerException("configuration controlPort must be > 0") } + if (config.subscriptionPort >= 65535) { throw ServerException("configuration controlPort must be < 65535") } - if (config.networkMtuSize <= 0) { throw newException("configuration networkMtuSize must be > 0") } - if (config.networkMtuSize >= 9 * 1024) { throw newException("configuration networkMtuSize must be < ${9 * 1024}") } + if (config.networkMtuSize <= 0) { throw ServerException("configuration networkMtuSize must be > 0") } + if (config.networkMtuSize >= 9 * 1024) { throw ServerException("configuration networkMtuSize must be < ${9 * 1024}") } autoClosableObjects.add(handshake) } diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 885512f2..7bb61528 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -238,7 +238,9 @@ open class Connection(val endPoint: EndPoint<*>, mediaDriverConnection: MediaDri */ suspend fun send(message: Any) { // The sessionId is globally unique, and is assigned by the server. - logger.debug("[{}] send: {}", publication.sessionId(), message) + logger.trace { + "[${publication.sessionId()}] send: $message" + } val kryo: KryoExtra = serialization.takeKryo() try { @@ -780,10 +782,37 @@ open class Connection(val endPoint: EndPoint<*>, mediaDriverConnection: MediaDri * * @see RemoteObject */ - suspend fun createObject(callback: suspend (Iface) -> Unit) { - val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 0) + suspend fun createObject(vararg objectParameters: Any?, callback: suspend (Int, Iface) -> Unit) { + val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 1) val interfaceClassId = endPoint.serialization.getClassId(iFaceClass) - rmiConnectionSupport.createRemoteObject(this, interfaceClassId, callback) + @Suppress("UNCHECKED_CAST") + objectParameters as Array + + rmiConnectionSupport.createRemoteObject(this, interfaceClassId, objectParameters, callback) + } + + /** + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + * + * The callback will be notified when the remote object has been created. + * + * Methods that return a value will throw [TimeoutException] if the response is not received with the + * response timeout [RemoteObject.responseTimeout]. + * + * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + * + * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `val remoteObject = test as RemoteObject` + * + * @see RemoteObject + */ + suspend fun createObject(callback: suspend (Int, Iface) -> Unit) { + val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function2::class.java, callback.javaClass, 1) + val interfaceClassId = endPoint.serialization.getClassId(iFaceClass) + + rmiConnectionSupport.createRemoteObject(this, interfaceClassId, null, callback) } } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index cd43cdb4..24dc9c0e 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -444,7 +444,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A internal suspend fun writeHandshakeMessage(publication: Publication, message: Any) { // The sessionId is globally unique, and is assigned by the server. - logger.debug("[{}] send: {}", publication.sessionId(), message) + logger.trace { + "[${publication.sessionId()}] send: $message" + } val kryo: KryoExtra = serialization.takeKryo() try { @@ -491,7 +493,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A val kryo: KryoExtra = serialization.takeKryo() try { val message = kryo.read(buffer, offset, length) - logger.debug("[{}] received: {}", header.sessionId(), message) + logger.trace { + "[${header.sessionId()}] received: $message" + } + return message } catch (e: Exception) { logger.error("Error de-serializing message on connection ${header.sessionId()}!", e) @@ -523,7 +528,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A val kryo: KryoExtra = serialization.takeKryo() try { message = kryo.read(buffer, offset, length, connection) - logger.debug("[{}] received: {}", sessionId, message) + logger.trace { + "[${sessionId}] received: ${message}" + } } catch (e: Exception) { listenerManager.notifyError(newException("[${sessionId}] Error de-serializing message", e)) } finally { diff --git a/src/dorkbox/network/connection/KryoExtra.kt b/src/dorkbox/network/connection/KryoExtra.kt index 89136e89..6b2a849f 100644 --- a/src/dorkbox/network/connection/KryoExtra.kt +++ b/src/dorkbox/network/connection/KryoExtra.kt @@ -191,7 +191,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap>) * + class and object bytes + * ++++++++++++++++++++++++++ */ - private fun read(reader: Input): Any { + fun read(reader: Input): Any { return readClassAndObject(reader) } diff --git a/src/dorkbox/network/connection/ListenerManager.kt b/src/dorkbox/network/connection/ListenerManager.kt index ab841274..cf63724a 100644 --- a/src/dorkbox/network/connection/ListenerManager.kt +++ b/src/dorkbox/network/connection/ListenerManager.kt @@ -175,7 +175,7 @@ internal class ListenerManager(private val logger: KLogg * * This method should not block for long periods as other network activity will not be processed until it returns. */ - suspend fun onMessage(function: suspend (CONNECTION, M) -> Unit) { + suspend fun onMessage(function: suspend (CONNECTION, MESSAGE) -> Unit) { onMessageMutex.withLock { // we have to follow the single-writer principle! diff --git a/src/dorkbox/network/rmi/RmiSupport.kt b/src/dorkbox/network/rmi/RmiSupport.kt index 1f906784..2db7af05 100644 --- a/src/dorkbox/network/rmi/RmiSupport.kt +++ b/src/dorkbox/network/rmi/RmiSupport.kt @@ -127,7 +127,7 @@ internal class RmiSupport(logger: KLogger, * called on "client" */ private fun onGenericObjectResponse(endPoint: EndPoint<*>, connection: Connection, logger: KLogger, - isGlobal: Boolean, rmiId: Int, callback: suspend (Any) -> Unit, + isGlobal: Boolean, rmiId: Int, callback: suspend (Int, Any) -> Unit, rmiSupportCache: RmiSupportCache, serialization: NetworkSerializationManager) { // we only create the proxy + execute the callback if the RMI id is valid! @@ -138,7 +138,7 @@ internal class RmiSupport(logger: KLogger, return } - val interfaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback::class.java, callback.javaClass, 0) + val interfaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback::class.java, callback.javaClass, 1) // create the client-side proxy object, if possible var proxyObject = rmiSupportCache.getProxyObject(rmiId) @@ -150,7 +150,7 @@ internal class RmiSupport(logger: KLogger, // this should be executed on a NEW coroutine! endPoint.actionDispatch.launch { try { - callback(proxyObject) + callback(rmiId, proxyObject) } catch (e: Exception) { logger.error("Error getting or creating the remote object $interfaceClass", e) } @@ -164,11 +164,11 @@ internal class RmiSupport(logger: KLogger, - internal fun registerCallback(callback: suspend (Iface) -> Unit): Int { + internal fun registerCallback(callback: suspend (Int, Iface) -> Unit): Int { return remoteObjectCreationCallbacks.register(callback) } - private fun removeCallback(callbackId: Int): suspend (Any) -> Unit { + private fun removeCallback(callbackId: Int): suspend (Int, Any) -> Unit { // callback's area always correct, because we track them ourselves. return remoteObjectCreationCallbacks.remove(callbackId)!! } @@ -382,12 +382,25 @@ internal class RmiSupport(logger: KLogger, private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) { val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) + val objectParameters = message.objectParameters + // We have to lookup the iface, since the proxy object requires it - val implObject = endPoint.serialization.createRmiObject(interfaceClassId) - val rmiId = saveImplObject(logger, implObject) + val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters) - // we send the message ANYWAYS, because the client needs to know it did NOT succeed! - connection.send(GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))) + val response = if (implObject is Exception) { + // whoops! + logger.error("Unable to create remote object!", implObject) + + // we send the message ANYWAYS, because the client needs to know it did NOT succeed! + GlobalObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId)) + } else { + val rmiId = saveImplObject(logger, implObject) + + // we send the message ANYWAYS, because the client needs to know it did NOT succeed! + GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId)) + } + + connection.send(response) } } diff --git a/src/dorkbox/network/rmi/RmiSupportConnection.kt b/src/dorkbox/network/rmi/RmiSupportConnection.kt index 34dbdd3b..38734a16 100644 --- a/src/dorkbox/network/rmi/RmiSupportConnection.kt +++ b/src/dorkbox/network/rmi/RmiSupportConnection.kt @@ -42,11 +42,11 @@ internal class RmiSupportConnection(logger: KLogger, /** * on the "client" to create a connection-specific remote object (that exists on the server) */ - suspend fun createRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) { + suspend fun createRemoteObject(connection: Connection, interfaceClassId: Int, objectParameters: Array?, callback: suspend (Int, Iface) -> Unit) { val callbackId = rmiGlobalSupport.registerCallback(callback) // There is no rmiID yet, because we haven't created it! - val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId)) + val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId), objectParameters) // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. @@ -62,25 +62,37 @@ internal class RmiSupportConnection(logger: KLogger, val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) + val objectParameters = message.objectParameters // We have to lookup the iface, since the proxy object requires it - val implObject = endPoint.serialization.createRmiObject(interfaceClassId) - val rmiId = saveImplObject(implObject) + val implObject = endPoint.serialization.createRmiObject(interfaceClassId, objectParameters) - if (rmiId != RemoteObjectStorage.INVALID_RMI) { - // this means we could register this object. + val response = if (implObject is Exception) { + // whoops! + logger.error("Unable to create remote object!", implObject) - // next, scan this object to see if there are any RMI fields - RmiSupport.scanImplForRmiFields(logger, implObject) { - saveImplObject(it) - } + // we send the message ANYWAYS, because the client needs to know it did NOT succeed! + ConnectionObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId)) } else { - logger.error { - "Trying to create an RMI object with the INVALID_RMI id!!" + val rmiId = saveImplObject(implObject) + + if (rmiId != RemoteObjectStorage.INVALID_RMI) { + // this means we could register this object. + + // next, scan this object to see if there are any RMI fields + RmiSupport.scanImplForRmiFields(logger, implObject) { + saveImplObject(it) + } + } else { + logger.error { + "Trying to create an RMI object with the INVALID_RMI id!!" + } } + + // we send the message ANYWAYS, because the client needs to know it did NOT succeed! + ConnectionObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId)) } - // we send the message ANYWAYS, because the client needs to know it did NOT succeed! - connection.send(ConnectionObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))) + connection.send(response) } } diff --git a/src/dorkbox/network/rmi/messages/ConnectionObjectCreateRequest.kt b/src/dorkbox/network/rmi/messages/ConnectionObjectCreateRequest.kt index c091d7a9..e33c2007 100644 --- a/src/dorkbox/network/rmi/messages/ConnectionObjectCreateRequest.kt +++ b/src/dorkbox/network/rmi/messages/ConnectionObjectCreateRequest.kt @@ -20,5 +20,21 @@ package dorkbox.network.rmi.messages * * @param interfaceClassId (LEFT) the Kryo interface class ID to create * @param callbackId (RIGHT) to know which callback to use when the object is created + * @param objectParameters the constructor parameters to create the object with */ -data class ConnectionObjectCreateRequest(val packedIds: Int) : RmiMessage +data class ConnectionObjectCreateRequest(val packedIds: Int, val objectParameters: Array?) : RmiMessage { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as ConnectionObjectCreateRequest + + if (packedIds != other.packedIds) return false + + return true + } + + override fun hashCode(): Int { + return packedIds + } +} diff --git a/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt b/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt index a1a039ba..9143c03a 100644 --- a/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt +++ b/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt @@ -20,6 +20,6 @@ package dorkbox.network.rmi.messages * * @param interfaceClassId (LEFT) the Kryo interface class ID to create * @param callbackId (RIGHT) to know which callback to use when the object is created + * @param objectParameters the constructor parameters to create the object with */ -data class GlobalObjectCreateRequest(val packedIds: Int) : RmiMessage -//a asd +data class GlobalObjectCreateRequest(val packedIds: Int, val objectParameters: Array?) : RmiMessage diff --git a/src/dorkbox/network/rmi/messages/ObjectResponseSerializer.kt b/src/dorkbox/network/rmi/messages/ObjectResponseSerializer.kt index 642221ab..eaab4466 100644 --- a/src/dorkbox/network/rmi/messages/ObjectResponseSerializer.kt +++ b/src/dorkbox/network/rmi/messages/ObjectResponseSerializer.kt @@ -49,6 +49,7 @@ import dorkbox.network.connection.KryoExtra */ class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap, Class<*>>) : Serializer(false) { override fun write(kryo: Kryo, output: Output, `object`: Any) { + println(" FIX ObjectResponseSerializer ") val kryoExtra = kryo as KryoExtra // val id = kryoExtra.rmiSupport.getRegisteredId(`object`) // // output.writeInt(id, true) @@ -56,6 +57,7 @@ class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap, } override fun read(kryo: Kryo, input: Input, implementationType: Class<*>): Any? { + println(" FIX ObjectResponseSerializer ") val kryoExtra = kryo as KryoExtra val objectID = input.readInt(true) @@ -66,3 +68,5 @@ class ObjectResponseSerializer(private val rmiImplToIface: IdentityMap, return null } } + +// TODO: FIX THIS CLASS MAYBE! diff --git a/src/dorkbox/network/serialization/NetworkSerializationManager.kt b/src/dorkbox/network/serialization/NetworkSerializationManager.kt index c7e6d882..5f92d949 100644 --- a/src/dorkbox/network/serialization/NetworkSerializationManager.kt +++ b/src/dorkbox/network/serialization/NetworkSerializationManager.kt @@ -19,59 +19,9 @@ import com.esotericsoftware.kryo.Serializer import dorkbox.network.connection.KryoExtra import dorkbox.network.rmi.CachedMethod import dorkbox.util.serialization.SerializationManager +import org.agrona.DirectBuffer -interface NetworkSerializationManager : SerializationManager { -// /** -// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. -// * -// * -// * There is a small speed penalty if there were no kryo's available to use. -// */ -// @Throws(IOException::class) -// fun write(connection: Connection_, message: Any) - -// /** -// * Reads an object from the buffer. -// * -// * @param length should ALWAYS be the length of the expected object! -// */ -// @Throws(IOException::class) -// fun read(connection: Connection_, length: Int): Any -// -// /** -// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. -// * -// * -// * There is a small speed penalty if there were no kryo's available to use. -// */ -// @Throws(IOException::class) -// fun writeWithCompression(connection: Connection_, message: Any) -// -// /** -// * Reads an object from the buffer. -// * -// * @param length should ALWAYS be the length of the expected object! -// */ -// @Throws(IOException::class) -// fun readWithCompression(connection: Connection_, length: Int): Any - -// /** -// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. -// * -// * -// * There is a small speed penalty if there were no kryo's available to use. -// */ -// @Throws(IOException::class) -// fun writeWithCrypto(connection: Connection_, message: Any) -// -// /** -// * Reads an object from the buffer. -// * -// * @param length should ALWAYS be the length of the expected object! -// */ -// @Throws(IOException::class) -// fun readWithCrypto(connection: Connection_, length: Int): Any - +interface NetworkSerializationManager : SerializationManager { /** * Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer]. * If the class is already registered, the existing entry is updated with the new serializer. @@ -153,7 +103,7 @@ interface NetworkSerializationManager : SerializationManager { * * @return the corresponding implementation object */ - fun createRmiObject(interfaceClassId: Int): Any + fun createRmiObject(interfaceClassId: Int, objectParameters: Array?): Any /** * Returns the Kryo class registration ID diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index c8951606..352e4dd7 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -26,6 +26,8 @@ import com.esotericsoftware.kryo.util.IdentityMap import dorkbox.network.connection.KryoExtra import dorkbox.network.connection.ping.PingMessage import dorkbox.network.handshake.Message +import dorkbox.network.pipeline.AeronInput +import dorkbox.network.pipeline.AeronOutput import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.messages.* @@ -33,14 +35,16 @@ import dorkbox.objectPool.ObjectPool import dorkbox.objectPool.PoolableObject import dorkbox.os.OS import dorkbox.util.serialization.SerializationDefaults -import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled +import org.agrona.DirectBuffer +import org.agrona.MutableDirectBuffer import org.agrona.collections.Int2ObjectHashMap import org.objenesis.instantiator.ObjectInstantiator import org.objenesis.strategy.StdInstantiatorStrategy import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.IOException +import java.lang.reflect.Constructor import java.lang.reflect.InvocationHandler /** @@ -549,8 +553,47 @@ class Serialization(references: Boolean, * * @return the corresponding implementation object */ - override fun createRmiObject(interfaceClassId: Int): Any { - return rmiIfaceToInstantiator[interfaceClassId].newInstance() + override fun createRmiObject(interfaceClassId: Int, objectParameters: Array?): Any { + try { + if (objectParameters == null) { + return rmiIfaceToInstantiator[interfaceClassId].newInstance() + } + + val size = objectParameters.size + + // we have to get the constructor for this object. + val clazz = getRmiImpl(getClassFromId(interfaceClassId)) + val constructors = clazz.declaredConstructors + + // now have to find the closest match. + val matchedBySize = constructors.filter { it.parameterCount == size } + + if (matchedBySize.size == 1) { + // this is our only option + return matchedBySize[0].newInstance(*objectParameters) + } + + // have to match by type + val matchedByType = mutableListOf>>() + objectParameters.forEachIndexed { index, any -> + if (any != null) { + matchedBySize.forEach { singleConstructor -> + var matchCount = 0 + if (singleConstructor.parameterTypes[index] == any::class.java) { + matchCount++ + } + + matchedByType.add(Pair(matchCount, singleConstructor)) + } + } + } + + // find the constructor with the highest match + matchedByType.sortByDescending { it.first } + return matchedByType[0].second.newInstance(*objectParameters) + } catch(e: Exception) { + return e + } } @@ -583,14 +626,14 @@ class Serialization(references: Boolean, * There is a small speed penalty if there were no kryo's available to use. */ @Throws(IOException::class) - override fun write(buffer: ByteBuf, message: Any) { -// val kryo = kryoPool.take() -// try { -// kryo.writeClassAndObject(buffer, message) -// kryo.write(NOP_CONNECTION, message) -// } finally { -// kryoPool.put(kryo) -// } + override fun write(buffer: DirectBuffer, message: Any) { + val kryo = kryoPool.take() + try { + val output = AeronOutput(buffer as MutableDirectBuffer) + kryo.writeClassAndObject(output, message) + } finally { + kryoPool.put(kryo) + } } /** @@ -602,22 +645,14 @@ class Serialization(references: Boolean, * @param length should ALWAYS be the length of the expected object! */ @Throws(IOException::class) - override fun read(buffer: ByteBuf, length: Int): Any? { -// val kryo = kryoPool.take() -// return try { -// if (wireReadLogger.isTraceEnabled) { -// val start = buffer.readerIndex() -// val `object` = kryo.read(buffer) -// val end = buffer.readerIndex() -// wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)) -// `object` -// } else { -// kryo.read(NOP_CONNECTION, buffer) -// } -// } finally { -// kryoPool.put(kryo) -// } - return null + override fun read(buffer: DirectBuffer, length: Int): Any? { + val kryo = kryoPool.take() + return try { + val input = AeronInput(buffer) + kryo.readClassAndObject(input) + } finally { + kryoPool.put(kryo) + } } /** diff --git a/test/dorkboxTest/network/PingPongTest.kt b/test/dorkboxTest/network/PingPongTest.kt index c4e62c57..86229145 100644 --- a/test/dorkboxTest/network/PingPongTest.kt +++ b/test/dorkboxTest/network/PingPongTest.kt @@ -22,8 +22,8 @@ package dorkboxTest.network import dorkbox.network.Client import dorkbox.network.Server import dorkbox.network.connection.Connection +import dorkbox.network.serialization.NetworkSerializationManager import dorkbox.util.exceptions.SecurityException -import dorkbox.util.serialization.SerializationManager import kotlinx.coroutines.runBlocking import org.junit.Assert import org.junit.Test @@ -104,7 +104,7 @@ class PingPongTest : BaseTest() { } } - private fun register(manager: SerializationManager) { + private fun register(manager: NetworkSerializationManager) { manager.register(Data::class.java) } diff --git a/test/dorkboxTest/network/rmi/RmiTest.kt b/test/dorkboxTest/network/rmi/RmiTest.kt index 4c3885f8..1c33a69e 100644 --- a/test/dorkboxTest/network/rmi/RmiTest.kt +++ b/test/dorkboxTest/network/rmi/RmiTest.kt @@ -68,7 +68,7 @@ class RmiTest : BaseTest() { Assert.assertFalse(s == remoteObject.toString()) test.moo() test.moo("Cow") - Assert.assertEquals(remoteObjectID.toLong(), test.id().toLong()) + Assert.assertEquals(remoteObjectID, test.id()) // Test that RMI correctly waits for the remotely invoked method to exit remoteObject.responseTimeout = 5000 @@ -135,6 +135,7 @@ class RmiTest : BaseTest() { m.number = 678 m.text = "sometext" connection.send(m) + println("Finished tests") } @@ -150,20 +151,12 @@ class RmiTest : BaseTest() { @Throws(SecurityException::class, IOException::class, InterruptedException::class) fun rmiNetworkGlobal() { rmiGlobal() - - // have to reset the object ID counter - TestCowImpl.ID_COUNTER.set(1) - Thread.sleep(2000L) } @Test @Throws(SecurityException::class, IOException::class, InterruptedException::class) fun rmiNetworkConnection() { rmi() - - // have to reset the object ID counter - TestCowImpl.ID_COUNTER.set(1) - Thread.sleep(2000L) } @Test @@ -174,10 +167,6 @@ class RmiTest : BaseTest() { configuration.listenIpAddress = LOOPBACK } } - - // have to reset the object ID counter - TestCowImpl.ID_COUNTER.set(1) - Thread.sleep(2000L) } @Throws(SecurityException::class, IOException::class) @@ -199,14 +188,14 @@ class RmiTest : BaseTest() { System.err.println("Received finish signal for test for: Client -> Server") val `object` = m.testCow val id = `object`.id() - Assert.assertEquals(1, id.toLong()) + Assert.assertEquals(23, id.toLong()) System.err.println("Finished test for: Client -> Server") - System.err.println("Starting test for: Server -> Client") - // normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug - connection.createObject { remoteObject -> + + System.err.println("Starting test for: Server -> Client") + connection.createObject(123) { rmiId, remoteObject -> System.err.println("Running test for: Server -> Client") - runTests(connection, remoteObject, 2) + runTests(connection, remoteObject, 123) System.err.println("Done with test for: Server -> Client") } } @@ -224,9 +213,9 @@ class RmiTest : BaseTest() { addEndPoint(client) client.onConnect { connection -> - connection.createObject { remoteObject -> + connection.createObject(23) { rmiId, remoteObject -> System.err.println("Running test for: Client -> Server") - runTests(connection, remoteObject, 1) + runTests(connection, remoteObject, 23) System.err.println("Done with test for: Client -> Server") } } @@ -235,7 +224,7 @@ class RmiTest : BaseTest() { System.err.println("Received finish signal for test for: Client -> Server") val `object` = m.testCow val id = `object`.id() - Assert.assertEquals(2, id.toLong()) + Assert.assertEquals(123, id.toLong()) System.err.println("Finished test for: Client -> Server") stopEndPoints(2000) } @@ -269,14 +258,14 @@ class RmiTest : BaseTest() { val `object` = m.testCow val id = `object`.id() - Assert.assertEquals(1, id.toLong()) + Assert.assertEquals(44, id.toLong()) System.err.println("Finished test for: Client -> Server") // normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug - connection.createObject { remoteObject -> + connection.createObject(4) { rmiId, remoteObject -> System.err.println("Running test for: Server -> Client") - runTests(connection, remoteObject, 2) + runTests(connection, remoteObject, 4) System.err.println("Done with test for: Server -> Client") } } @@ -296,7 +285,7 @@ class RmiTest : BaseTest() { System.err.println("Received finish signal for test for: Client -> Server") val `object` = m.testCow val id = `object`.id() - Assert.assertEquals(2, id.toLong()) + Assert.assertEquals(4, id.toLong()) System.err.println("Finished test for: Client -> Server") stopEndPoints(2000) } @@ -307,9 +296,9 @@ class RmiTest : BaseTest() { System.err.println("Starting test for: Client -> Server") // this creates a GLOBAL object on the server (instead of a connection specific object) - client.createObject { remoteObject -> + client.createObject(44) { rmiId, remoteObject -> System.err.println("Running test for: Client -> Server") - runTests(client.getConnection(), remoteObject, 1) + runTests(client.getConnection(), remoteObject, 44) System.err.println("Done with test for: Client -> Server") } } diff --git a/test/dorkboxTest/network/rmi/classes/TestCowBaseImpl.kt b/test/dorkboxTest/network/rmi/classes/TestCowBaseImpl.kt index 90b73812..3b28bbfb 100644 --- a/test/dorkboxTest/network/rmi/classes/TestCowBaseImpl.kt +++ b/test/dorkboxTest/network/rmi/classes/TestCowBaseImpl.kt @@ -16,7 +16,6 @@ package dorkboxTest.network.rmi.classes open class TestCowBaseImpl : TestCowBase { override fun throwException() { - System.err.println("The following exception is EXPECTED, but should only be on one log!") throw UnsupportedOperationException("Why would I do that?") } diff --git a/test/dorkboxTest/network/rmi/classes/TestCowImpl.kt b/test/dorkboxTest/network/rmi/classes/TestCowImpl.kt index f4c41f3b..c2d3b87a 100644 --- a/test/dorkboxTest/network/rmi/classes/TestCowImpl.kt +++ b/test/dorkboxTest/network/rmi/classes/TestCowImpl.kt @@ -14,18 +14,10 @@ */ package dorkboxTest.network.rmi.classes -import java.util.concurrent.atomic.AtomicInteger - -class TestCowImpl : TestCowBaseImpl(), - TestCow { - - companion object { - // has to start at 1 - val ID_COUNTER = AtomicInteger(1) - } +class TestCowImpl(val id: Int) : TestCowBaseImpl(), TestCow { private var moos = 0 - private val id = ID_COUNTER.getAndIncrement() + override fun moo() { moos++ println("Moo! $moos")