Simplified RMI Response manager method invocations and access. code polish

This commit is contained in:
nathan 2020-08-21 00:27:08 +02:00
parent c117c90ee5
commit 0a9ae32595
5 changed files with 125 additions and 88 deletions

View File

@ -72,7 +72,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
private val previousClosedConnectionActivity: Long = 0
private val handshake = ClientHandshake(logger, config, crypto, listenerManager)
private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization, actionDispatch)
private val rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, serialization)
init {
// have to do some basic validation of our configuration

View File

@ -27,8 +27,14 @@ import java.lang.reflect.Method
import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
object MyUnconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run() // !!!
}
/**
* Handles network communication when methods are invoked on a proxy.
@ -40,14 +46,14 @@ import kotlin.coroutines.resumeWithException
* @param rmiObjectId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param connection this is really the network client -- there is ONLY ever 1 connection
* @param proxyString this is the name assigned to the proxy [toString] method
* @param rmiObjectCache is used to provide RMI support
* @param responseManager is used to provide RMI request/response support
* @param cachedMethods this is the methods available for the specified class
*/
internal class RmiClient(val isGlobal: Boolean,
val rmiObjectId: Int,
private val connection: Connection,
private val proxyString: String,
private val rmiObjectCache: RmiObjectCache,
private val responseManager: RmiResponseManager,
private val cachedMethods: Array<CachedMethod>) : InvocationHandler {
companion object {
@ -90,8 +96,6 @@ internal class RmiClient(val isGlobal: Boolean,
// response (even if it is a void response). This simplifies our response mask, and lets us use more bits for storing the
// response ID
val responseStorage = rmiObjectCache.getResponseStorage()
// NOTE: we ALWAYS send a response from the remote end.
//
// 'async' -> DO NOT WAIT
@ -102,7 +106,7 @@ internal class RmiClient(val isGlobal: Boolean,
// If we are async, we ignore the response....
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code excution is in lockstep
val rmiWaiter = responseStorage.prep(isAsync)
val rmiWaiter = responseManager.prep(isAsync)
invokeMethod.isGlobal = isGlobal
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
@ -112,7 +116,7 @@ internal class RmiClient(val isGlobal: Boolean,
// if we are async, then this will immediately return
return responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis)
return responseManager.waitForReply(isAsync, rmiWaiter, timeoutMillis)
}
@Suppress("DuplicatedCode")
@ -200,47 +204,92 @@ internal class RmiClient(val isGlobal: Boolean,
// async will return immediately
var returnValue: Any? = null
if (suspendCoroutineObject is Continuation<*>) {
// https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe
// https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class
// https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391
/**
* https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667
* https://github.com/square/retrofit/blob/bfb5cd375a300658dae48e29fa03d0ab553c8cf6/retrofit/src/main/java/retrofit2/KotlinExtensions.kt
* https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/HttpServiceMethod.java
* https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/Utils.java
* https://github.com/square/retrofit/tree/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2
*/
// returnValue = try {
// invokeSuspendFunction(suspendCoroutineObject) {
//// kotlinx.coroutines.suspendCancellableCoroutine<Any?> { continuation: Any? ->
//// continuation.resume(body)
//// }
// val continuation = suspendCoroutineObject as Continuation<Any?>
//
//// withContext(Dispatchers.Unconfined) {
// delay(100)
// sendRequest(invokeMethod)
//// }
//
// val suspendFunction: suspend () -> Any? = {
// val rmiResult = sendRequest(invokeMethod)
// println("RMI: ${rmiResult?.javaClass}")
// println(1)
// delay(3000)
// println(2)
// }
// val suspendFunction1: Function1<Continuation<Any?>, *> = suspendFunction as Function1<Continuation<Any?>?, *>
// returnValue = suspendFunction1.invoke(Continuation(EmptyCoroutineContext) {
// it.getOrNull().apply {
// continuation.resume(this)
// }
// })
//
// System.err.println("have suspend ret value ${returnValue?.javaClass}")
//
//// returnValue = invokeSuspendFunction(continuation, suspendFunction)
//
// // https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe
// // https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class
// // https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391
//
//
// // NOTE:
// // Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes
// // invoke the supplied callback with an exception before the invoking stack frame can return.
// // Coroutines will intercept the subsequent invocation of the Continuation and throw the
// // exception synchronously. A Java Proxy cannot throw checked exceptions without them being
// // declared on the interface method. To avoid the synchronous checked exception being wrapped
// // in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will
// // force suspension to occur so that it can be instead delivered to the continuation to
// // bypass this restriction.
//
// /**
// * https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/
// * https://github.com/Kotlin/kotlinx.coroutines/pull/1667
// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/KotlinExtensions.kt
// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/HttpServiceMethod.java
// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/Utils.java
// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2
// */
//// returnValue = try {
//// val actualContinuation = suspendCoroutineObject.intercepted() as Continuation<Any?>
////// suspend {
////// try {
////// delay(100)
////// sendRequest(invokeMethod)
////// } catch (e: Exception) {
////// yield()
////// throw e
////// }
////// }.startCoroutineUninterceptedOrReturn(actualContinuation)
////
//// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable {
//// invokeSuspendFunction(suspendCoroutineObject) {
//// invokeSuspendFunction(actualContinuation) {
////
//// }
//// })
//
// } catch (e: Exception) {
// e.printStackTrace()
// }
//
// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) {
// // we were suspend, and when we unsuspend, we will pick up where we left off
// return returnValue
// }
////
//////// kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn<Any?> {
//// delay(100)
//// sendRequest(invokeMethod)
//////// }
////////
////// kotlinx.coroutines.suspendCancellableCoroutine<Any?> { continuation: Any? ->
////// resume(body)
////// }
////// withContext(MyUnconfined) {
//////
////// }
//// }
////
////// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable {
////// invokeSuspendFunction(suspendCoroutineObject) {
//////
////// }
////// })
////
//// } catch (e: Exception) {
//// e.printStackTrace()
//// }
////
//// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) {
//// // we were suspend, and when we unsuspend, we will pick up where we left off
//// return returnValue
//// }
// if this was an exception, we want to get it out!
returnValue = runBlocking {
@ -299,12 +348,12 @@ internal class RmiClient(val isGlobal: Boolean,
ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
throw exception
}
is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue)
throw returnValue
}
// is Exception -> {
// // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
// ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue)
// throw returnValue
// }
else -> {
return returnValue
}
@ -333,8 +382,12 @@ internal class RmiClient(val isGlobal: Boolean,
// trampoline so we can access suspend functions correctly and (if suspend) get the coroutine connection parameter)
private fun invokeSuspendFunction(continuation: Continuation<*>, suspendFunction: suspend () -> Any?): Any {
return SuspendFunctionTrampoline.invoke(continuation, suspendFunction) as Any
private fun invokeSuspendFunction(continuation: Continuation<Any?>, suspendFunction: suspend () -> Any?): Any {
return SuspendFunctionTrampoline.invoke(Continuation<Any?>(EmptyCoroutineContext) {
it.getOrNull().apply {
continuation.resume(this)
}
}, suspendFunction) as Any
}
override fun hashCode(): Int {

View File

@ -21,14 +21,13 @@ import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.util.collections.LockFreeIntMap
import kotlinx.coroutines.CoroutineScope
import mu.KLogger
internal class RmiManagerConnections(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal,
private val serialization: NetworkSerializationManager,
actionDispatch: CoroutineScope) : RmiObjectCache(logger, actionDispatch) {
private val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
// It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections!
private val proxyObjects = LockFreeIntMap<RemoteObject>()
/**
@ -46,29 +45,21 @@ internal class RmiManagerConnections(logger: KLogger,
proxyObjects.put(rmiId, remoteObject)
}
private fun <Iface> createProxyObject(isGlobalObject: Boolean,
connection: Connection,
endPoint: EndPoint<*>,
objectId: Int,
interfaceClass: Class<Iface>) : Iface {
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = RmiManagerGlobal.createProxyObject(isGlobalObject, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass)
saveProxyObject(objectId, proxyObject)
}
@Suppress("UNCHECKED_CAST")
return proxyObject as Iface
}
/**
* on the connection+client to get a connection-specific remote object (that exists on the server/client)
*/
fun <Iface> getRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class<Iface>): Iface {
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = RmiManagerGlobal.createProxyObject(false, connection, serialization, rmiGlobalSupport.rmiResponseManager,
endPoint.type.simpleName, objectId, interfaceClass)
saveProxyObject(objectId, proxyObject)
}
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
return createProxyObject(false, connection, endPoint, objectId, interfaceClass)
@Suppress("UNCHECKED_CAST")
return proxyObject as Iface
}

View File

@ -26,7 +26,6 @@ import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.util.classes.ClassHelper
import dorkbox.util.collections.LockFreeIntMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
@ -35,7 +34,7 @@ import java.util.*
internal class RmiManagerGlobal(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger, actionDispatch) {
internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
companion object {
/**
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
@ -54,7 +53,7 @@ internal class RmiManagerGlobal(logger: KLogger,
*/
internal fun createProxyObject(isGlobalObject: Boolean,
connection: Connection, serialization: NetworkSerializationManager,
rmiObjectCache: RmiObjectCache, namePrefix: String,
responseManager: RmiResponseManager, namePrefix: String,
rmiId: Int, interfaceClass: Class<*>): RemoteObject {
require(interfaceClass.isInterface) { "iface must be an interface." }
@ -69,7 +68,7 @@ internal class RmiManagerGlobal(logger: KLogger,
// the ACTUAL proxy is created in the connection impl. Our proxy handler MUST BE suspending because of:
// 1) how we send data on the wire
// 2) how we must (sometimes) wait for a response
val proxyObject = RmiClient(isGlobalObject, rmiId, connection, name, rmiObjectCache, cachedMethods)
val proxyObject = RmiClient(isGlobalObject, rmiId, connection, name, responseManager, cachedMethods)
// This is the interface inheritance by the proxy object
val interfaces: Array<Class<*>> = arrayOf(RemoteObject::class.java, interfaceClass)
@ -131,7 +130,7 @@ internal class RmiManagerGlobal(logger: KLogger,
}
}
private val proxyObjects = LockFreeIntMap<RemoteObject>()
val rmiResponseManager = RmiResponseManager(logger, actionDispatch)
// this is used for all connection specific ones as well.
private val remoteObjectCreationCallbacks = RemoteObjectStorage(logger)
@ -208,6 +207,7 @@ internal class RmiManagerGlobal(logger: KLogger,
override fun close() {
super.close()
rmiResponseManager.close()
remoteObjectCreationCallbacks.close()
}
@ -232,7 +232,7 @@ internal class RmiManagerGlobal(logger: KLogger,
// create the client-side proxy object, if possible. This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId)
if (proxyObject == null) {
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiObjectCache, endPoint.type.simpleName, rmiId, interfaceClass)
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, endPoint.type.simpleName, rmiId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject)
}
@ -255,7 +255,7 @@ internal class RmiManagerGlobal(logger: KLogger,
// so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = createProxyObject(true, connection, serialization, this, endPoint.type.simpleName, objectId, interfaceClass)
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, endPoint.type.simpleName, objectId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject)
}
@ -431,7 +431,7 @@ internal class RmiManagerGlobal(logger: KLogger,
}
is MethodResponse -> {
// notify the pending proxy requests that we have a response!
getResponseStorage().onMessage(message)
rmiResponseManager.onMessage(message)
}
}
}

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.rmi
import kotlinx.coroutines.CoroutineScope
import mu.KLogger
/**
@ -24,9 +23,8 @@ import mu.KLogger
* The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there
* would be conflicts in the data structure)
*/
internal open class RmiObjectCache(logger: KLogger, actionDispatch: CoroutineScope) {
internal open class RmiObjectCache(logger: KLogger) {
private val responseStorage = RmiResponseManager(logger, actionDispatch)
private val implObjects = RemoteObjectStorage(logger)
fun saveImplObject(rmiObject: Any): Int {
@ -46,12 +44,7 @@ internal open class RmiObjectCache(logger: KLogger, actionDispatch: CoroutineSco
return implObjects.remove(rmiId) as T?
}
fun getResponseStorage(): RmiResponseManager {
return responseStorage
}
open fun close() {
responseStorage.close()
implObjects.close()
}
}