diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index dec55f14..9442d14f 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -319,7 +319,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A * from a "global" context */ internal open fun getRmiConnectionSupport() : RmiManagerConnections { - return RmiManagerConnections(logger, rmiGlobalSupport, serialization, actionDispatch) + return RmiManagerConnections(logger, rmiGlobalSupport, serialization) } /** diff --git a/src/dorkbox/network/other/coroutines/SuspendWaiter.kt b/src/dorkbox/network/other/coroutines/SuspendWaiter.kt deleted file mode 100644 index 795fc253..00000000 --- a/src/dorkbox/network/other/coroutines/SuspendWaiter.kt +++ /dev/null @@ -1,34 +0,0 @@ -package dorkbox.network.other.coroutines - -import kotlinx.coroutines.channels.Channel - -// this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting -// https://kotlinlang.org/docs/reference/coroutines/channels.html -inline class SuspendWaiter(private val channel: Channel = Channel()) { - // "receive' suspends until another coroutine invokes "send" - // and - // "send" suspends until another coroutine invokes "receive". - suspend fun doWait() { - try { - channel.receive() - } catch (ignored: Exception) { - } - } - suspend fun doNotify() { - try { - channel.send(Unit) - } catch (ignored: Exception) { - } - } - fun cancel() { - try { - channel.cancel() - } catch (ignored: Exception) { - } - } - fun isCancelled(): Boolean { - // once the channel is cancelled, it can never work again - @Suppress("EXPERIMENTAL_API_USAGE") - return channel.isClosedForReceive && channel.isClosedForSend - } -} diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index e66952aa..d7b6954d 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -17,10 +17,8 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection import dorkbox.network.connection.ListenerManager -import dorkbox.network.other.coroutines.SuspendFunctionTrampoline import dorkbox.network.rmi.messages.MethodRequest import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import java.lang.reflect.InvocationHandler import java.lang.reflect.Method @@ -114,11 +112,53 @@ internal class RmiClient(val isGlobal: Boolean, connection.send(invokeMethod) - // if we are async, then this will immediately return return responseManager.waitForReply(isAsync, rmiWaiter, timeoutMillis) } + private fun returnAsyncOrSync(method: Method, returnValue: Any?): Any? { + if (isAsync) { + // if we are async then we return immediately. + // If you want the response value, disable async! + val returnType = method.returnType + if (returnType.isPrimitive) { + return when (returnType) { + Int::class.javaPrimitiveType -> { + 0 + } + Boolean::class.javaPrimitiveType -> { + java.lang.Boolean.FALSE + } + Float::class.javaPrimitiveType -> { + 0.0f + } + Char::class.javaPrimitiveType -> { + 0.toChar() + } + Long::class.javaPrimitiveType -> { + 0L + } + Short::class.javaPrimitiveType -> { + 0.toShort() + } + Byte::class.javaPrimitiveType -> { + 0.toByte() + } + Double::class.javaPrimitiveType -> { + 0.0 + } + else -> { + null + } + } + } + return null + } + else { + return returnValue + } + } + @Suppress("DuplicatedCode") /** * @throws Exception @@ -195,152 +235,47 @@ internal class RmiClient(val isGlobal: Boolean, - - // if a 'suspend' function is called, then our last argument is a 'Continuation' object // We will use this for our coroutine context instead of running on a new coroutine - val suspendCoroutineObject = args?.lastOrNull() + val suspendCoroutineArg = args?.lastOrNull() // async will return immediately - var returnValue: Any? = null - if (suspendCoroutineObject is Continuation<*>) { -// val continuation = suspendCoroutineObject as Continuation -// -// -// val suspendFunction: suspend () -> Any? = { -// val rmiResult = sendRequest(invokeMethod) -// println("RMI: ${rmiResult?.javaClass}") -// println(1) -// delay(3000) -// println(2) -// } -// val suspendFunction1: Function1, *> = suspendFunction as Function1?, *> -// returnValue = suspendFunction1.invoke(Continuation(EmptyCoroutineContext) { -// it.getOrNull().apply { -// continuation.resume(this) -// } -// }) -// -// System.err.println("have suspend ret value ${returnValue?.javaClass}") -// -//// returnValue = invokeSuspendFunction(continuation, suspendFunction) -// -// // https://stackoverflow.com/questions/57230869/how-to-propagate-kotlin-coroutine-context-through-reflective-invocation-of-suspe -// // https://stackoverflow.com/questions/52869672/call-kotlin-suspend-function-in-java-class -// // https://discuss.kotlinlang.org/t/how-to-continue-a-suspend-function-in-a-dynamic-proxy-in-the-same-coroutine/11391 -// -// -// // NOTE: -// // Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes -// // invoke the supplied callback with an exception before the invoking stack frame can return. -// // Coroutines will intercept the subsequent invocation of the Continuation and throw the -// // exception synchronously. A Java Proxy cannot throw checked exceptions without them being -// // declared on the interface method. To avoid the synchronous checked exception being wrapped -// // in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will -// // force suspension to occur so that it can be instead delivered to the continuation to -// // bypass this restriction. -// -// /** -// * https://jakewharton.com/exceptions-and-proxies-and-coroutines-oh-my/ -// * https://github.com/Kotlin/kotlinx.coroutines/pull/1667 -// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/KotlinExtensions.kt -// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/HttpServiceMethod.java -// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2/Utils.java -// * https://github.com/square/retrofit/blob/master/retrofit/src/main/java/retrofit2 -// */ -//// returnValue = try { -//// val actualContinuation = suspendCoroutineObject.intercepted() as Continuation -////// suspend { -////// try { -////// delay(100) -////// sendRequest(invokeMethod) -////// } catch (e: Exception) { -////// yield() -////// throw e -////// } -////// }.startCoroutineUninterceptedOrReturn(actualContinuation) -//// -//// invokeSuspendFunction(actualContinuation) { -//// -//// -//////// kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn { -//// delay(100) -//// sendRequest(invokeMethod) -//////// } -//////// -////// kotlinx.coroutines.suspendCancellableCoroutine { continuation: Any? -> -////// resume(body) -////// } -////// withContext(MyUnconfined) { -////// -////// } -//// } -//// -////// MyUnconfined.dispatch(suspendCoroutineObject.context, Runnable { -////// invokeSuspendFunction(suspendCoroutineObject) { -////// -////// } -////// }) -//// -//// } catch (e: Exception) { -//// e.printStackTrace() -//// } -//// -//// if (returnValue == kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) { -//// // we were suspend, and when we unsuspend, we will pick up where we left off -//// return returnValue -//// } + if (suspendCoroutineArg is Continuation<*>) { + @Suppress("UNCHECKED_CAST") + val continuation = suspendCoroutineArg as Continuation - // if this was an exception, we want to get it out! - returnValue = runBlocking { + val suspendFunction: suspend () -> Any? = { sendRequest(invokeMethod) } - } else { - returnValue = runBlocking { - sendRequest(invokeMethod) - } - } - if (isAsync) { - // if we are async then we return immediately. - // If you want the response value, disable async! - val returnType = method.returnType - if (returnType.isPrimitive) { - return when (returnType) { - Int::class.javaPrimitiveType -> { - 0 + // function suspension works differently !! + @Suppress("UNCHECKED_CAST") + return (suspendFunction as Function1, Any?>).invoke(Continuation(EmptyCoroutineContext) { + val any = it.getOrNull() + when (any) { + RmiResponseManager.TIMEOUT_EXCEPTION -> { + val fancyName = RmiUtils.makeFancyMethodName(method) + val exception = TimeoutException("Response timed out: $fancyName") + // from top down, clean up the coroutine stack + ListenerManager.cleanStackTrace(exception, RmiClient::class.java) + continuation.resumeWithException(exception) } - Boolean::class.javaPrimitiveType -> { - java.lang.Boolean.FALSE - } - Float::class.javaPrimitiveType -> { - 0.0f - } - Char::class.javaPrimitiveType -> { - 0.toChar() - } - Long::class.javaPrimitiveType -> { - 0L - } - Short::class.javaPrimitiveType -> { - 0.toShort() - } - Byte::class.javaPrimitiveType -> { - 0.toByte() - } - Double::class.javaPrimitiveType -> { - 0.0 + is Exception -> { + // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call + // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) + ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, any) + continuation.resumeWithException(any) } else -> { - null + continuation.resume(returnAsyncOrSync(method, any)) } } + }) + } else { + val any = runBlocking { + sendRequest(invokeMethod) } - return null - } - else { - // this will not return immediately. This will be suspended until there is a response - when (returnValue) { + when (any) { RmiResponseManager.TIMEOUT_EXCEPTION -> { val fancyName = RmiUtils.makeFancyMethodName(method) val exception = TimeoutException("Response timed out: $fancyName") @@ -348,48 +283,19 @@ internal class RmiClient(val isGlobal: Boolean, ListenerManager.cleanStackTrace(exception, RmiClient::class.java) throw exception } -// is Exception -> { -// // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call -// // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) -// ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, returnValue) -// throw returnValue -// } + is Exception -> { + // reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call + // this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site) + ListenerManager.cleanStackTrace(Exception(), RmiClient::class.java, any) + throw any + } else -> { - return returnValue + return returnAsyncOrSync(method, any) } } } } - - /** - * Force the calling coroutine to suspend before throwing [this]. - * - * This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy] - * invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException]. - * - * The implementation is derived from: - * https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349 - */ - suspend fun suspendAndThrow(e: Throwable): Nothing { - kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn { continuation -> - Dispatchers.Default.dispatch(continuation.context, Runnable { - continuation.resumeWithException(e) - }) - kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED - } - } - - - // trampoline so we can access suspend functions correctly and (if suspend) get the coroutine connection parameter) - private fun invokeSuspendFunction(continuation: Continuation, suspendFunction: suspend () -> Any?): Any { - return SuspendFunctionTrampoline.invoke(Continuation(EmptyCoroutineContext) { - it.getOrNull().apply { - continuation.resume(this) - } - }, suspendFunction) as Any - } - override fun hashCode(): Int { val prime = 31 var result = 1 diff --git a/test/dorkboxTest/network/rmi/RmiTest.kt b/test/dorkboxTest/network/rmi/RmiTest.kt index b4e705a5..a5fdbb22 100644 --- a/test/dorkboxTest/network/rmi/RmiTest.kt +++ b/test/dorkboxTest/network/rmi/RmiTest.kt @@ -99,10 +99,10 @@ class RmiTest : BaseTest() { e.printStackTrace() caught = true } + Assert.assertTrue(caught) caught = false - // Non-blocking call tests // Non-blocking call tests // Non-blocking call tests @@ -140,6 +140,7 @@ class RmiTest : BaseTest() { } // exceptions are not caught when async = true! Assert.assertFalse(caught) + caught = false // Call will time out if non-blocking isn't working properly