WIP suspending coroutine calls

This commit is contained in:
nathan 2020-08-19 14:50:51 +02:00
parent 0c9949225f
commit f2704218f2
3 changed files with 55 additions and 35 deletions

View File

@ -16,6 +16,7 @@
package dorkbox.network.rmi package dorkbox.network.rmi
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.ListenerManager
import dorkbox.network.other.coroutines.SuspendFunctionTrampoline import dorkbox.network.other.coroutines.SuspendFunctionTrampoline
import dorkbox.network.rmi.messages.MethodRequest import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
@ -112,39 +113,27 @@ internal class RmiClient(val isGlobal: Boolean,
// if we are async, then this will immediately return // if we are async, then this will immediately return
@Suppress("MoveVariableDeclarationIntoWhen")
val result = responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis) val result = responseStorage.waitForReply(isAsync, rmiWaiter, timeoutMillis)
when (result) { when (result) {
RmiResponseManager.TIMEOUT_EXCEPTION -> { RmiResponseManager.TIMEOUT_EXCEPTION -> {
// TODO: from top down, clean up the coroutine stack val fancyName = RmiUtils.makeFancyMethodName(method)
throw TimeoutException("Response timed out: ${method.declaringClass.name}.${method.name}(${method.parameterTypes.map{it.simpleName}})") val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
throw exception
} }
is Exception -> { is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, result)
val stackTrace = Exception().stackTrace
val myClassName = RmiClient::class.java.name
var newStartIndex = 0
for (element in stackTrace) {
newStartIndex++
if (element.className == myClassName && element.methodName == "invoke") {
// we do this 1 more time, because we want to remove the proxy invocation off the stack as well.
newStartIndex++
break
}
}
val newStack = Array<StackTraceElement>(result.stackTrace.size + stackTrace.size - newStartIndex) { stackTrace[0] }
result.stackTrace.copyInto(newStack)
stackTrace.copyInto(newStack, result.stackTrace.size, newStartIndex)
result.stackTrace = newStack
throw result throw result
} }
else -> { else -> {
// val fancyName = RmiUtils.makeFancyMethodName(method)
// val exception = TimeoutException("Response timed out: $fancyName")
// // from top down, clean up the coroutine stack
// ListenerManager.cleanStackTrace(exception, RmiClient::class.java)
// throw exception
return result return result
} }
} }
@ -218,15 +207,22 @@ internal class RmiClient(val isGlobal: Boolean,
val maybeContinuation = args?.lastOrNull() val maybeContinuation = args?.lastOrNull()
// async will return immediately // async will return immediately
val returnValue = if (maybeContinuation is Continuation<*>) { val returnValue =
invokeSuspendFunction(maybeContinuation) { // if (maybeContinuation is Continuation<*>) {
sendRequest(method, args) // try {
} // invokeSuspendFunction(maybeContinuation) {
} else { // sendRequest(method, args)
// }
// } catch (e: Exception) {
// println("EXCEPT!")
// }
// // if this was an exception, we want to get it out!
//
// } else {
runBlocking { runBlocking {
sendRequest(method, args ?: EMPTY_ARRAY) sendRequest(method, args ?: EMPTY_ARRAY)
} }
} // }
if (!isAsync) { if (!isAsync) {

View File

@ -375,9 +375,9 @@ internal class RmiMessageManager(logger: KLogger,
insideResult.initCause(null) insideResult.initCause(null)
} }
ListenerManager.cleanStackTrace(insideResult as Throwable) ListenerManager.cleanStackTraceReverse(insideResult as Throwable)
logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", val fancyName = RmiUtils.makeFancyMethodName(cachedMethod)
insideResult) logger.error("Error invoking method: $fancyName", insideResult)
} }
insideResult insideResult
} }
@ -415,9 +415,9 @@ internal class RmiMessageManager(logger: KLogger,
result.initCause(null) result.initCause(null)
} }
ListenerManager.cleanStackTrace(result as Throwable) ListenerManager.cleanStackTraceReverse(result as Throwable)
logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", val fancyName = RmiUtils.makeFancyMethodName(cachedMethod)
result) logger.error("Error invoking method: $fancyName", result)
} }
if (sendResponse) { if (sendResponse) {

View File

@ -438,4 +438,28 @@ object RmiUtils {
fun unpackRight(packedInt: Int): Int { fun unpackRight(packedInt: Int): Int {
return packedInt.toShort().toInt() return packedInt.toShort().toInt()
} }
fun makeFancyMethodName(cachedMethod: CachedMethod): String {
val parameterTypes = cachedMethod.method.parameterTypes
val size = parameterTypes.size
val args: String = if (size == 0 || parameterTypes[size - 1] == Continuation::class.java) {
""
} else {
parameterTypes.joinToString { it.simpleName }
}
return "${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}($args)"
}
fun makeFancyMethodName(method: Method): String {
val parameterTypes = method.parameterTypes
val size = parameterTypes.size
val args: String = if (size != 0 || parameterTypes[size - 1] == Continuation::class.java) {
""
} else {
parameterTypes.joinToString { it.simpleName }
}
return "${method.declaringClass.name}.${method.name}($args)"
}
} }