rmiID is ALWAYS on the right for packed IDs. Fixed rmiID unsigned operations. WIP suspend proxy invocations

This commit is contained in:
nathan 2020-08-20 13:08:36 +02:00
parent ee5e9eb24e
commit 698701dfdc
11 changed files with 302 additions and 109 deletions

View File

@ -19,11 +19,15 @@ import dorkbox.network.connection.Connection
import dorkbox.network.connection.ListenerManager
import dorkbox.network.other.coroutines.SuspendFunctionTrampoline
import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@ -76,7 +80,7 @@ internal class RmiClient(val isGlobal: Boolean,
private var enableEquals = false
// if we are ASYNC, then this method immediately returns
private suspend fun sendRequest(method: Method, args: Array<Any>): Any? {
private suspend fun sendRequest(invokeMethod: MethodRequest): Any? {
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
@ -100,43 +104,15 @@ internal class RmiClient(val isGlobal: Boolean,
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code excution is in lockstep
val rmiWaiter = responseStorage.prep(isAsync)
val invokeMethod = MethodRequest()
invokeMethod.isGlobal = isGlobal
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
invokeMethod.args = args // if this is a kotlin suspend function, the suspend arg will NOT be here!
// which method do we access? We always want to access the IMPLEMENTATION (if available!). we know that this will always succeed
// this should be accessed via the KRYO class ID + method index (both are SHORT, and can be packed)
invokeMethod.cachedMethod = cachedMethods.first { it.method == method }
connection.send(invokeMethod)
// if we are async, then this will immediately return
val result = responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis)
when (result) {
RmiResponseManager.TIMEOUT_EXCEPTION -> {
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
throw exception
}
is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, result)
throw result
}
else -> {
// val fancyName = RmiUtils.makeFancyMethodName(method)
// val exception = TimeoutException("Response timed out: $fancyName")
// // from top down, clean up the coroutine stack
// ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
// throw exception
return result
}
}
return responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis)
}
@Suppress("DuplicatedCode")
@ -202,70 +178,160 @@ internal class RmiClient(val isGlobal: Boolean,
}
}
// setup the RMI request
val invokeMethod = MethodRequest()
// if this is a kotlin suspend function, the continuation arg will NOT be here (it's replaced at runtime)!
invokeMethod.args = args ?: EMPTY_ARRAY
// which method do we access? We always want to access the IMPLEMENTATION (if available!). we know that this will always succeed
// this should be accessed via the KRYO class ID + method index (both are SHORT, and can be packed)
invokeMethod.cachedMethod = cachedMethods.first { it.method == method }
// if a 'suspend' function is called, then our last argument is a 'Continuation' object
// We will use this for our coroutine context instead of running on a new coroutine
val maybeContinuation = args?.lastOrNull()
val suspendCoroutineObject = args?.lastOrNull()
// async will return immediately
val returnValue =
// if (maybeContinuation is Continuation<*>) {
// try {
// invokeSuspendFunction(maybeContinuation) {
// sendRequest(method, args)
// }
// } catch (e: Exception) {
// println("EXCEPT!")
// }
// // if this was an exception, we want to get it out!
var returnValue: Any? = null
if (suspendCoroutineObject is Continuation<*>) {
// https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe
// https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class
// https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391
/**
* https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667
* https://github.com/square/retrofit/blob/bfb5cd375a300658dae48e29fa03d0ab553c8cf6/retrofit/src/main/java/retrofit2/KotlinExtensions.kt
* https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/HttpServiceMethod.java
* https://github.com/square/retrofit/blob/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2/Utils.java
* https://github.com/square/retrofit/tree/108fe23964b986107aed352ba467cd2007d15208/retrofit/src/main/java/retrofit2
*/
// returnValue = try {
// invokeSuspendFunction(suspendCoroutineObject) {
//// kotlinx.coroutines.suspendCancellableCoroutine<Any?> { continuation: Any? ->
//// continuation.resume(body)
//// }
//
// } else {
runBlocking {
sendRequest(method, args ?: EMPTY_ARRAY)
//// withContext(Dispatchers.Unconfined) {
// delay(100)
// sendRequest(invokeMethod)
//// }
// }
//
////
//// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable {
//// invokeSuspendFunction(suspendCoroutineObject) {
////
//// }
//// })
//
// } catch (e: Exception) {
// e.printStackTrace()
// }
//
// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) {
// // we were suspend, and when we unsuspend, we will pick up where we left off
// return returnValue
// }
// if this was an exception, we want to get it out!
returnValue = runBlocking {
sendRequest(invokeMethod)
}
} else {
returnValue = runBlocking {
sendRequest(invokeMethod)
}
// }
if (!isAsync) {
return returnValue
}
// if we are async then we return immediately.
// If you want the response value, disable async!
val returnType = method.returnType
if (returnType.isPrimitive) {
return when (returnType) {
Int::class.javaPrimitiveType -> {
0
if (isAsync) {
// if we are async then we return immediately.
// If you want the response value, disable async!
val returnType = method.returnType
if (returnType.isPrimitive) {
return when (returnType) {
Int::class.javaPrimitiveType -> {
0
}
Boolean::class.javaPrimitiveType -> {
java.lang.Boolean.FALSE
}
Float::class.javaPrimitiveType -> {
0.0f
}
Char::class.javaPrimitiveType -> {
0.toChar()
}
Long::class.javaPrimitiveType -> {
0L
}
Short::class.javaPrimitiveType -> {
0.toShort()
}
Byte::class.javaPrimitiveType -> {
0.toByte()
}
Double::class.javaPrimitiveType -> {
0.0
}
else -> {
null
}
}
Boolean::class.javaPrimitiveType -> {
java.lang.Boolean.FALSE
}
return null
}
else {
// this will not return immediately. This will be suspended until there is a response
when (returnValue) {
RmiResponseManager.TIMEOUT_EXCEPTION -> {
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
throw exception
}
Float::class.javaPrimitiveType -> {
0.0f
}
Char::class.javaPrimitiveType -> {
0.toChar()
}
Long::class.javaPrimitiveType -> {
0L
}
Short::class.javaPrimitiveType -> {
0.toShort()
}
Byte::class.javaPrimitiveType -> {
0.toByte()
}
Double::class.javaPrimitiveType -> {
0.0
is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue)
throw returnValue
}
else -> {
null
return returnValue
}
}
}
return null
}
/**
* Force the calling coroutine to suspend before throwing [this].
*
* This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy]
* invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException].
*
* The implementation is derived from:
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
*/
suspend fun suspendAndThrow(e: Throwable): Nothing {
kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
Dispatchers.Default.dispatch(continuation.context, Runnable {
continuation.resumeWithException(e)
})
kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}
}
// trampoline so we can access suspend functions correctly and (if suspend) get the coroutine connection parameter)
private fun invokeSuspendFunction(continuation: Continuation<*>, suspendFunction: suspend () -> Any?): Any {
return SuspendFunctionTrampoline.invoke(continuation, suspendFunction) as Any

View File

@ -87,7 +87,7 @@ internal class RmiManagerForConnections(logger: KLogger,
logger.error("Unable to create remote object!", implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
ConnectionObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId))
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI))
} else {
val rmiId = saveImplObject(implObject)
@ -105,7 +105,7 @@ internal class RmiManagerForConnections(logger: KLogger,
}
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
ConnectionObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId))
}
connection.send(response)

View File

@ -277,8 +277,8 @@ internal class RmiMessageManager(logger: KLogger,
/**
* called on "client"
*/
val rmiId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val rmiId = RmiUtils.unpackRight(message.packedIds)
val callback = removeCallback(callbackId)
onGenericObjectResponse(endPoint, connection, logger, false, rmiId, callback, this, serialization)
}
@ -292,8 +292,8 @@ internal class RmiMessageManager(logger: KLogger,
/**
* called on "client"
*/
val rmiId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val rmiId = RmiUtils.unpackRight(message.packedIds)
val callback = removeCallback(callbackId)
onGenericObjectResponse(endPoint, connection, logger, true, rmiId, callback, this, serialization)
}
@ -308,7 +308,7 @@ internal class RmiMessageManager(logger: KLogger,
val isGlobal = message.isGlobal
val isCoroutine = message.isCoroutine
val rmiObjectId = RmiUtils.unpackLeft(message.packedId)
val rmiId = RmiUtils.unpackRight(message.packedId)
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val cachedMethod = message.cachedMethod
val args = message.args
val sendResponse = rmiId != 1 // async is always with a '1', and we should NOT send a message back if it is '1'
@ -434,7 +434,7 @@ internal class RmiMessageManager(logger: KLogger,
}
private suspend fun returnRmiMessage(connection: Connection, message: MethodRequest, result: Any?, logger: KLogger) {
logger.trace { "RMI returned: ${RmiUtils.unpackRight(message.packedId)}" }
logger.trace { "RMI returned: ${RmiUtils.unpackUnsignedRight(message.packedId)}" }
val rmiMessage = MethodResponse()
rmiMessage.packedId = message.packedId
@ -460,12 +460,12 @@ internal class RmiMessageManager(logger: KLogger,
logger.error("Unable to create remote object!", implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
GlobalObjectCreateResponse(RmiUtils.packShorts(RemoteObjectStorage.INVALID_RMI, callbackId))
GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI))
} else {
val rmiId = saveImplObject(logger, implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))
GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId))
}
connection.send(response)

View File

@ -34,7 +34,6 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
companion object {
val TIMEOUT_EXCEPTION = Exception()
val ASYNC_WAITER = RmiWaiter(1) // this is never waited on, we just need this to optimize how we assigned waiters.
const val MAX = Short.MAX_VALUE.toInt()
}
@ -42,7 +41,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// these are just looped around in a ring buffer.
// These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire
// 64,000 IN FLIGHT RMI method invocations is plenty
private val maxValuesInCache = (MAX * 2) - 1 // -1 because 0 is a reserved number
private val maxValuesInCache = 65535 - 2 // -2 because '0' and '1' are reserved
private val rmiWaiterCache = Channel<RmiWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock()
@ -55,10 +54,13 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
ids.add(id)
}
// MIN (32768) -> -1 (65535)
// 2 (2) -> MAX (32767)
// ZERO is special, and is never added!
// ONE is special, and is used for ASYNC (the response will never be sent back)
for (id in 1..Short.MAX_VALUE) {
for (id in 2..Short.MAX_VALUE) {
ids.add(id)
}
ids.shuffle()
@ -73,15 +75,14 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// resume any pending remote object method invocations (if they are not async, or not manually waiting)
// async RMI will never get here!
suspend fun onMessage(message: MethodResponse) {
val rmiId = RmiUtils.unpackRight(message.packedId)
val adjustedRmiId = rmiId + MAX
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
logger.trace { "RMI return: $rmiId" }
val previous = pendingLock.write {
val previous = pending[adjustedRmiId]
pending[adjustedRmiId] = result
val previous = pending[rmiId]
pending[rmiId] = result
previous
}
@ -112,7 +113,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
responseRmi.prep()
pendingLock.write {
pending[responseRmi.id + MAX] = responseRmi
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
}
responseRmi
@ -127,8 +129,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
return null
}
val rmiId = rmiWaiter.id
val adjustedRmiId = rmiWaiter.id + MAX
@Suppress("EXPERIMENTAL_API_USAGE")
val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
// NOTE: we ALWAYS send a response from the remote end.
//
@ -141,7 +143,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read { pending[adjustedRmiId] }
val maybeResult = pendingLock.read { pending[rmiId] }
if (maybeResult is RmiWaiter) {
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
@ -162,8 +164,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
rmiWaiter.doWait()
val resultOrWaiter = pendingLock.write {
val previous = pending[adjustedRmiId]
pending[adjustedRmiId] = null
val previous = pending[rmiId]
pending[rmiId] = null
previous
}

View File

@ -440,6 +440,10 @@ object RmiUtils {
return packedInt.toShort().toInt()
}
fun unpackUnsignedRight(packedInt: Int): Int {
return packedInt.toUShort().toInt()
}
fun makeFancyMethodName(cachedMethod: CachedMethod): String {
val parameterTypes = cachedMethod.method.parameterTypes
val size = parameterTypes.size
@ -463,4 +467,104 @@ object RmiUtils {
return "${method.declaringClass.name}.${method.name}($args)"
}
//
// suspend fun <T : Any> Call<T>.await(): T {
// return suspendCancellableCoroutine { continuation ->
// continuation.invokeOnCancellation {
// cancel()
// }
// enqueue(object : Callback<T> {
// override fun onResponse(call: Call<T>, response: PingResult.Response<T>) {
// if (response.isSuccessful) {
// val body = response.body()
// if (body == null) {
// val invocation = call.request().tag(Invocation::class.java)!!
// val method = invocation.method()
// val e = KotlinNullPointerException("Response from " +
// method.declaringClass.name +
// '.' +
// method.name +
// " was null but response body type was declared as non-null")
// continuation.resumeWithException(e)
// } else {
// continuation.resume(body)
// }
// } else {
// continuation.resumeWithException(HttpException(response))
// }
// }
//
// override fun onFailure(call: Call<T>, t: Throwable) {
// continuation.resumeWithException(t)
// }
// })
// }
// }
//
// @JvmName("awaitNullable")
// suspend fun <T : Any> Call<T?>.await(): T? {
// return suspendCancellableCoroutine { continuation ->
// continuation.invokeOnCancellation {
// cancel()
// }
// enqueue(object : Callback<T?> {
// override fun onResponse(call: Call<T?>, response: PingResult.Response<T?>) {
// if (response.isSuccessful) {
// continuation.resume(response.body())
// } else {
// continuation.resumeWithException(HttpException(response))
// }
// }
//
// override fun onFailure(call: Call<T?>, t: Throwable) {
// continuation.resumeWithException(t)
// }
// })
// }
// }
//
// suspend fun <T> Call<T>.awaitResponse(): PingResult.Response<T> {
// return suspendCancellableCoroutine { continuation ->
// continuation.invokeOnCancellation {
// cancel()
// }
// enqueue(object : Callback<T> {
// override fun onResponse(call: Call<T>, response: PingResult.Response<T>) {
// continuation.resume(response)
// }
//
// override fun onFailure(call: Call<T>, t: Throwable) {
// continuation.resumeWithException(t)
// }
// })
// }
// }
//
// /**
// * Force the calling coroutine to suspend before throwing [this].
// *
// * This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy]
// * invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException].
// *
// * The implementation is derived from:
// * https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
// */
// suspend fun Exception.suspendAndThrow(): Nothing {
// kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn<Unit> { continuation: Continuation<Unit> ->
// Dispatchers.Default.dispatch(continuation.context, Runnable {
// continuation.intercepted().resumeWithException(this@suspendAndThrow)
// })
//
// kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
// }
// }
}

View File

@ -19,7 +19,7 @@ package dorkbox.network.rmi.messages
/**
* These use packed IDs, because both are REALLY shorts, but the JVM deals better with ints.
*
* @param rmiId (LEFT) the Kryo interface class ID to create
* @param callbackId (RIGHT) to know which callback to use when the object is created
* @param callbackId (LEFT) to know which callback to use when the object is created
* @param rmiId (RIGHT) the Kryo interface class ID to create
*/
data class ConnectionObjectCreateResponse(val packedIds: Int) : RmiMessage

View File

@ -19,7 +19,7 @@ package dorkbox.network.rmi.messages
/**
* These use packed IDs, because both are REALLY shorts, but the JVM deals better with ints.
*
* @param rmiId (LEFT) the Kryo interface class ID to create
* @param callbackId (RIGHT) to know which callback to use when the object is created
* @param callbackId (LEFT) to know which callback to use when the object is created
* @param rmiId (RIGHT) the Kryo interface class ID to create
*/
data class GlobalObjectCreateResponse(val packedIds: Int) : RmiMessage

View File

@ -27,7 +27,7 @@ class RmiPackIdTest {
fun rmiObjectIdNegative() {
// these are SHORTS, so SHORT.MIN -> SHORT.MAX, excluding 0
for (rmiObjectId in Short.MIN_VALUE..-1) {
for (rmiId in 1..Short.MAX_VALUE) {
for (rmiId in 2..Short.MAX_VALUE) {
val packed = RmiUtils.packShorts(rmiObjectId, rmiId)
val rmiObjectId2 = RmiUtils.unpackLeft(packed)
val rmiId2 = RmiUtils.unpackRight(packed)
@ -42,7 +42,7 @@ class RmiPackIdTest {
fun rmiIdNegative() {
// these are SHORTS, so SHORT.MIN -> SHORT.MAX, excluding 0
for (rmiId in Short.MIN_VALUE..-1) {
for (rmiObjectId in 1..Short.MAX_VALUE) {
for (rmiObjectId in 2..Short.MAX_VALUE) {
val packed = RmiUtils.packShorts(rmiObjectId, rmiId)
val rmiObjectId2 = RmiUtils.unpackLeft(packed)
val rmiId2 = RmiUtils.unpackRight(packed)

View File

@ -90,6 +90,17 @@ class RmiTest : BaseTest() {
caught = true
}
Assert.assertTrue(caught)
caught = false
try {
test.throwSuspendException()
} catch (e: UnsupportedOperationException) {
System.err.println("\tExpected exception (exception log should also be on the object impl side).")
e.printStackTrace()
caught = true
}
Assert.assertTrue(caught)
caught = false
// Non-blocking call tests
@ -109,7 +120,6 @@ class RmiTest : BaseTest() {
// exceptions are still dealt with properly
test.moo("Baa")
caught = false
try {
test.throwException()
} catch (e: IllegalStateException) {
@ -119,6 +129,17 @@ class RmiTest : BaseTest() {
}
// exceptions are not caught when async = true!
Assert.assertFalse(caught)
caught = false
try {
test.throwSuspendException()
} catch (e: IllegalStateException) {
System.err.println("\tExpected exception (exception log should also be on the object impl side).")
e.printStackTrace()
caught = true
}
// exceptions are not caught when async = true!
Assert.assertFalse(caught)
// Call will time out if non-blocking isn't working properly

View File

@ -20,5 +20,5 @@ package dorkboxTest.network.rmi.classes
*/
interface TestCowBase {
fun throwException()
suspend fun throwSuspendException()
suspend fun throwSuspendException(): Boolean
}

View File

@ -20,7 +20,7 @@ open class TestCowBaseImpl : TestCowBase {
throw UnsupportedOperationException("Why would I do that?")
}
override suspend fun throwSuspendException() {
override suspend fun throwSuspendException(): Boolean {
throw UnsupportedOperationException("Why would I do that on suspend?")
}