diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 99209375..3262bad8 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -72,7 +72,7 @@ open class Client(config: Configuration = Configuration private val previousClosedConnectionActivity: Long = 0 private val handshake = ClientHandshake(logger, config, crypto, listenerManager) - private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization, actionDispatch) + private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization) init { // have to do some basic validation of our configuration diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index ed674c26..e66952aa 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -27,8 +27,14 @@ import java.lang.reflect.Method import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +object MyUnconfined : CoroutineDispatcher() { + override fun isDispatchNeeded(context: CoroutineContext): Boolean = false + override fun dispatch(context: CoroutineContext, block: Runnable) = block.run() // !!! +} /** * Handles network communication when methods are invoked on a proxy. @@ -40,14 +46,14 @@ import kotlin.coroutines.resumeWithException * @param rmiObjectId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID * @param connection this is really the network client -- there is ONLY ever 1 connection * @param proxyString this is the name assigned to the proxy [toString] method - * @param rmiObjectCache is used to provide RMI support + * @param responseManager is used to provide RMI request/response support * @param cachedMethods this is the methods available for the specified class */ internal class RmiClient(val isGlobal: Boolean, val rmiObjectId: Int, private val connection: Connection, private val proxyString: String, - private val rmiObjectCache: RmiObjectCache, + private val responseManager: RmiResponseManager, private val cachedMethods: Array) : InvocationHandler { companion object { @@ -90,8 +96,6 @@ internal class RmiClient(val isGlobal: Boolean, // response (even if it is a void response). This simplifies our response mask, and lets us use more bits for storing the // response ID - val responseStorage = rmiObjectCache.getResponseStorage() - // NOTE: we ALWAYS send a response from the remote end. // // 'async' -> DO NOT WAIT @@ -102,7 +106,7 @@ internal class RmiClient(val isGlobal: Boolean, // If we are async, we ignore the response.... // The response, even if there is NOT one (ie: not void) will always return a thing (so our code excution is in lockstep - val rmiWaiter = responseStorage.prep(isAsync) + val rmiWaiter = responseManager.prep(isAsync) invokeMethod.isGlobal = isGlobal invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id) @@ -112,7 +116,7 @@ internal class RmiClient(val isGlobal: Boolean, // if we are async, then this will immediately return - return responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis) + return responseManager.waitForReply(isAsync, rmiWaiter, timeoutMillis) } @Suppress("DuplicatedCode") @@ -200,47 +204,92 @@ internal class RmiClient(val isGlobal: Boolean, // async will return immediately var returnValue: Any? = null if (suspendCoroutineObject is Continuation<*>) { - - // https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe - // https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class - // https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391 - - - /** - * https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/ - * https://github.com/Kotlin/kotlinx.coroutines/pull/1667 - * https://github.com/square/retrofit/blob/bfb5cd375a300658dae48e29fa03d0ab553c8cf6/retrofit/src/main/java/retrofit2/KotlinExtensions.kt - * https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/HttpServiceMethod.java - * https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/Utils.java - * https://github.com/square/retrofit/tree/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2 - */ -// returnValue = try { -// invokeSuspendFunction(suspendCoroutineObject) { -//// kotlinx.coroutines.suspendCancellableCoroutine { continuation: Any? -> -//// continuation.resume(body) -//// } +// val continuation = suspendCoroutineObject as Continuation // -//// withContext(Dispatchers.Unconfined) { -// delay(100) -// sendRequest(invokeMethod) -//// } +// +// val suspendFunction: suspend () -> Any? = { +// val rmiResult = sendRequest(invokeMethod) +// println("RMI: ${rmiResult?.javaClass}") +// println(1) +// delay(3000) +// println(2) +// } +// val suspendFunction1: Function1, *> = suspendFunction as Function1?, *> +// returnValue = suspendFunction1.invoke(Continuation(EmptyCoroutineContext) { +// it.getOrNull().apply { +// continuation.resume(this) // } +// }) // +// System.err.println("have suspend ret value ${returnValue?.javaClass}") +// +//// returnValue = invokeSuspendFunction(continuation, suspendFunction) +// +// // https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe +// // https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class +// // https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391 +// +// +// // NOTE: +// // Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes +// // invoke the supplied callback with an exception before the invoking stack frame can return. +// // Coroutines will intercept the subsequent invocation of the Continuation and throw the +// // exception synchronously. A Java Proxy cannot throw checked exceptions without them being +// // declared on the interface method. To avoid the synchronous checked exception being wrapped +// // in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will +// // force suspension to occur so that it can be instead delivered to the continuation to +// // bypass this restriction. +// +// /** +// * https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/ +// * https://github.com/Kotlin/kotlinx.coroutines/pull/1667 +// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/KotlinExtensions.kt +// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/HttpServiceMethod.java +// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/Utils.java +// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2 +// */ +//// returnValue = try { +//// val actualContinuation = suspendCoroutineObject.intercepted() as Continuation +////// suspend { +////// try { +////// delay(100) +////// sendRequest(invokeMethod) +////// } catch (e: Exception) { +////// yield() +////// throw e +////// } +////// }.startCoroutineUninterceptedOrReturn(actualContinuation) //// -//// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable { -//// invokeSuspendFunction(suspendCoroutineObject) { +//// invokeSuspendFunction(actualContinuation) { //// -//// } -//// }) -// -// } catch (e: Exception) { -// e.printStackTrace() -// } -// -// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) { -// // we were suspend, and when we unsuspend, we will pick up where we left off -// return returnValue -// } +//// +//////// kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn { +//// delay(100) +//// sendRequest(invokeMethod) +//////// } +//////// +////// kotlinx.coroutines.suspendCancellableCoroutine { continuation: Any? -> +////// resume(body) +////// } +////// withContext(MyUnconfined) { +////// +////// } +//// } +//// +////// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable { +////// invokeSuspendFunction(suspendCoroutineObject) { +////// +////// } +////// }) +//// +//// } catch (e: Exception) { +//// e.printStackTrace() +//// } +//// +//// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) { +//// // we were suspend, and when we unsuspend, we will pick up where we left off +//// return returnValue +//// } // if this was an exception, we want to get it out! returnValue = runBlocking { @@ -299,12 +348,12 @@ internal class RmiClient(val isGlobal: Boolean, ListenerManager.cleanStackTrace(exception, RmiClient::class.java) throw exception } - is Exception -> { - // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call - // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) - ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue) - throw returnValue - } +// is Exception -> { +// // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call +// // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) +// ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue) +// throw returnValue +// } else -> { return returnValue } @@ -333,8 +382,12 @@ internal class RmiClient(val isGlobal: Boolean, // trampoline so we can access suspend functions correctly and (if suspend) get the coroutine connection parameter) - private fun invokeSuspendFunction(continuation: Continuation<*>, suspendFunction: suspend () -> Any?): Any { - return SuspendFunctionTrampoline.invoke(continuation, suspendFunction) as Any + private fun invokeSuspendFunction(continuation: Continuation, suspendFunction: suspend () -> Any?): Any { + return SuspendFunctionTrampoline.invoke(Continuation(EmptyCoroutineContext) { + it.getOrNull().apply { + continuation.resume(this) + } + }, suspendFunction) as Any } override fun hashCode(): Int { diff --git a/src/dorkbox/network/rmi/RmiManagerConnections.kt b/src/dorkbox/network/rmi/RmiManagerConnections.kt index be5c1012..15acdcbb 100644 --- a/src/dorkbox/network/rmi/RmiManagerConnections.kt +++ b/src/dorkbox/network/rmi/RmiManagerConnections.kt @@ -21,14 +21,13 @@ import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse import dorkbox.network.serialization.NetworkSerializationManager import dorkbox.util.collections.LockFreeIntMap -import kotlinx.coroutines.CoroutineScope import mu.KLogger internal class RmiManagerConnections(logger: KLogger, val rmiGlobalSupport: RmiManagerGlobal, - private val serialization: NetworkSerializationManager, - actionDispatch: CoroutineScope) : RmiObjectCache(logger, actionDispatch) { + private val serialization: NetworkSerializationManager) : RmiObjectCache(logger) { + // It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections! private val proxyObjects = LockFreeIntMap() /** @@ -46,29 +45,21 @@ internal class RmiManagerConnections(logger: KLogger, proxyObjects.put(rmiId, remoteObject) } - private fun createProxyObject(isGlobalObject: Boolean, - connection: Connection, - endPoint: EndPoint<*>, - objectId: Int, - interfaceClass: Class) : Iface { - - // so we can just instantly create the proxy object (or get the cached one) - var proxyObject = getProxyObject(objectId) - if (proxyObject == null) { - proxyObject = RmiManagerGlobal.createProxyObject(isGlobalObject, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass) - saveProxyObject(objectId, proxyObject) - } - - @Suppress("UNCHECKED_CAST") - return proxyObject as Iface - } - /** * on the connection+client to get a connection-specific remote object (that exists on the server/client) */ fun getRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class): Iface { + // so we can just instantly create the proxy object (or get the cached one) + var proxyObject = getProxyObject(objectId) + if (proxyObject == null) { + proxyObject = RmiManagerGlobal.createProxyObject(false, connection, serialization, rmiGlobalSupport.rmiResponseManager, + endPoint.type.simpleName, objectId, interfaceClass) + saveProxyObject(objectId, proxyObject) + } + // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! - return createProxyObject(false, connection, endPoint, objectId, interfaceClass) + @Suppress("UNCHECKED_CAST") + return proxyObject as Iface } diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index a7814374..a0537854 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -26,7 +26,6 @@ import dorkbox.network.rmi.messages.MethodRequest import dorkbox.network.rmi.messages.MethodResponse import dorkbox.network.serialization.NetworkSerializationManager import dorkbox.util.classes.ClassHelper -import dorkbox.util.collections.LockFreeIntMap import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import mu.KLogger @@ -35,7 +34,7 @@ import java.util.* internal class RmiManagerGlobal(logger: KLogger, actionDispatch: CoroutineScope, - internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger, actionDispatch) { + internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger) { companion object { /** * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked @@ -54,7 +53,7 @@ internal class RmiManagerGlobal(logger: KLogger, */ internal fun createProxyObject(isGlobalObject: Boolean, connection: Connection, serialization: NetworkSerializationManager, - rmiObjectCache: RmiObjectCache, namePrefix: String, + responseManager: RmiResponseManager, namePrefix: String, rmiId: Int, interfaceClass: Class<*>): RemoteObject { require(interfaceClass.isInterface) { "iface must be an interface." } @@ -69,7 +68,7 @@ internal class RmiManagerGlobal(logger: KLogger, // the ACTUAL proxy is created in the connection impl. Our proxy handler MUST BE suspending because of: // 1) how we send data on the wire // 2) how we must (sometimes) wait for a response - val proxyObject = RmiClient(isGlobalObject, rmiId, connection, name, rmiObjectCache, cachedMethods) + val proxyObject = RmiClient(isGlobalObject, rmiId, connection, name, responseManager, cachedMethods) // This is the interface inheritance by the proxy object val interfaces: Array> = arrayOf(RemoteObject::class.java, interfaceClass) @@ -131,7 +130,7 @@ internal class RmiManagerGlobal(logger: KLogger, } } - private val proxyObjects = LockFreeIntMap() + val rmiResponseManager = RmiResponseManager(logger, actionDispatch) // this is used for all connection specific ones as well. private val remoteObjectCreationCallbacks = RemoteObjectStorage(logger) @@ -208,6 +207,7 @@ internal class RmiManagerGlobal(logger: KLogger, override fun close() { super.close() + rmiResponseManager.close() remoteObjectCreationCallbacks.close() } @@ -232,7 +232,7 @@ internal class RmiManagerGlobal(logger: KLogger, // create the client-side proxy object, if possible. This MUST be an object that is saved for the connection var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId) if (proxyObject == null) { - proxyObject = createProxyObject(isGlobal, connection, serialization, rmiObjectCache, endPoint.type.simpleName, rmiId, interfaceClass) + proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, endPoint.type.simpleName, rmiId, interfaceClass) connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject) } @@ -255,7 +255,7 @@ internal class RmiManagerGlobal(logger: KLogger, // so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId) if (proxyObject == null) { - proxyObject = createProxyObject(true, connection, serialization, this, endPoint.type.simpleName, objectId, interfaceClass) + proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, endPoint.type.simpleName, objectId, interfaceClass) connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject) } @@ -431,7 +431,7 @@ internal class RmiManagerGlobal(logger: KLogger, } is MethodResponse -> { // notify the pending proxy requests that we have a response! - getResponseStorage().onMessage(message) + rmiResponseManager.onMessage(message) } } } diff --git a/src/dorkbox/network/rmi/RmiObjectCache.kt b/src/dorkbox/network/rmi/RmiObjectCache.kt index a50cb9fb..ff248dcf 100644 --- a/src/dorkbox/network/rmi/RmiObjectCache.kt +++ b/src/dorkbox/network/rmi/RmiObjectCache.kt @@ -15,7 +15,6 @@ */ package dorkbox.network.rmi -import kotlinx.coroutines.CoroutineScope import mu.KLogger /** @@ -24,9 +23,8 @@ import mu.KLogger * The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there * would be conflicts in the data structure) */ -internal open class RmiObjectCache(logger: KLogger, actionDispatch: CoroutineScope) { +internal open class RmiObjectCache(logger: KLogger) { - private val responseStorage = RmiResponseManager(logger, actionDispatch) private val implObjects = RemoteObjectStorage(logger) fun saveImplObject(rmiObject: Any): Int { @@ -46,12 +44,7 @@ internal open class RmiObjectCache(logger: KLogger, actionDispatch: CoroutineSco return implObjects.remove(rmiId) as T? } - fun getResponseStorage(): RmiResponseManager { - return responseStorage - } - open fun close() { - responseStorage.close() implObjects.close() } }