From 594019020bc3aabe2dfca0d727d60ee76ad7b8f9 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 9 Sep 2020 15:23:20 +0200 Subject: [PATCH] Fixed issue,and clarified RMI.timeout = 0 (this will wait forever) --- src/dorkbox/network/rmi/RemoteObject.kt | 7 +- src/dorkbox/network/rmi/RmiClient.kt | 12 ++- src/dorkbox/network/rmi/RmiResponseManager.kt | 95 ++++++++++--------- 3 files changed, 60 insertions(+), 54 deletions(-) diff --git a/src/dorkbox/network/rmi/RemoteObject.kt b/src/dorkbox/network/rmi/RemoteObject.kt index e6225293..1dcc5cb1 100644 --- a/src/dorkbox/network/rmi/RemoteObject.kt +++ b/src/dorkbox/network/rmi/RemoteObject.kt @@ -47,15 +47,14 @@ interface RemoteObject { var responseTimeout: Int /** - * Sets the behavior when invoking a remote method. DEFAULT is false. + * Sets the behavior when invoking a remote method. DEFAULT is false. This is not thread safe! * * If true, the invoking thread will not wait for a response. The method will return immediately and the return value * should be ignored. * - * If false, the invoking thread will wait (if called via suspend, then it will use coroutines) for the remote method to return or - * timeout. + * If false, the invoking thread will wait for the remote method to return or timeout. * - * If the return value or exception needs to be retrieved, then DO NOT set async, and change the response timeout + * If the return value or an exception needs to be retrieved, then DO NOT set async=true, and change the response timeout to 0 instead */ var async: Boolean diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index 0ac4ebed..2253317e 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -96,9 +96,8 @@ internal class RmiClient(val isGlobal: Boolean, // NOTE: we ALWAYS send a response from the remote end. // // 'async' -> DO NOT WAIT - // 'timeout > 0' -> WAIT - // 'timeout == 0' -> same as async (DO NOT WAIT) - val isAsync = isAsync || timeoutMillis <= 0L + // 'timeout > 0' -> WAIT w/ TIMEOUT + // 'timeout == 0' -> WAIT FOREVER // If we are async, we ignore the response.... @@ -112,7 +111,11 @@ internal class RmiClient(val isGlobal: Boolean, connection.send(invokeMethod) // if we are async, then this will immediately return - return responseManager.waitForReply(isAsync, rmiWaiter, timeoutMillis) + return if (isAsync) { + null + } else { + responseManager.waitForReply(rmiWaiter, timeoutMillis) + } } private fun returnAsyncOrSync(method: Method, returnValue: Any?): Any? { @@ -173,6 +176,7 @@ internal class RmiClient(val isGlobal: Boolean, setResponseTimeoutMethod -> { timeoutMillis = (args!![0] as Int).toLong() + require(timeoutMillis >= 0) { "ResponseTimeout must be >= 0"} return null } getResponseTimeoutMethod -> { diff --git a/src/dorkbox/network/rmi/RmiResponseManager.kt b/src/dorkbox/network/rmi/RmiResponseManager.kt index aad473a5..c60b12ed 100644 --- a/src/dorkbox/network/rmi/RmiResponseManager.kt +++ b/src/dorkbox/network/rmi/RmiResponseManager.kt @@ -133,60 +133,63 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio /** * @return the result (can be null) or timeout exception */ - suspend fun waitForReply(isAsync: Boolean, rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? { - if (isAsync) { - return null - } - - @Suppress("EXPERIMENTAL_API_USAGE") + suspend fun waitForReply(rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? { val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually - // NOTE: we ALWAYS send a response from the remote end. - // - // 'async' -> DO NOT WAIT (and no response) - // 'timeout > 0' -> WAIT - // 'timeout <= 0' -> same as async (DO NOT WAIT) - - - @Suppress("EXPERIMENTAL_API_USAGE") - val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) { - // NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point) - // we want this behavior INSTEAD OF automatically starting this on a new thread. - delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting - - // check if we have a result or not - val maybeResult = pendingLock.read { - val prev = pending[rmiId] - // maybeResult cannot be null, because the only thing that removes it is after the job cancel! - if (prev!!.result == null) { - prev - } else { - null - } - } - - if (maybeResult != null) { - // maybeResult cannot be null, because the only thing that removes it is after the job cancel! - logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } - - maybeResult.cancel() - } - } - logger.trace { "RMI waiting: $rmiId" } - // wait for the response. + // NOTE: we ALWAYS send a response from the remote end. // - // If the response is ALREADY here, the doWait() returns instantly (with result) - // if no response yet, it will suspend and either - // A) get response - // B) timeout - rmiWaiter.doWait() + // 'async' -> DO NOT WAIT + // 'timeout > 0' -> WAIT w/ TIMEOUT + // 'timeout == 0' -> WAIT FOREVER + if (timeoutMillis > 0) { + @Suppress("EXPERIMENTAL_API_USAGE") + val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) { + // NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point) + // we want this behavior INSTEAD OF automatically starting this on a new thread. + delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting + + // check if we have a result or not + val maybeResult = pendingLock.read { + val prev = pending[rmiId] + // maybeResult cannot be null, because the only thing that removes it is after the job cancel! + if (prev!!.result == null) { + prev + } else { + null + } + } + + if (maybeResult != null) { + // maybeResult cannot be null, because the only thing that removes it is after the job cancel! + logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } + + maybeResult.cancel() + } + } + + // wait for the response. + // + // If the response is ALREADY here, the doWait() returns instantly (with result) + // if no response yet, it will suspend and either + // A) get response + // B) timeout + rmiWaiter.doWait() + + // always cancel the timeout + responseTimeoutJob.cancel() + } else { + // wait for the response --- THIS WAITS FOREVER! + // + // If the response is ALREADY here, the doWait() returns instantly (with result) + // if no response yet, it will suspend and + // A) get response + rmiWaiter.doWait() + } - // always cancel the timeout - responseTimeoutJob.cancel() val pendingResult = pendingLock.write { // inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile