Fixed issue,and clarified RMI.timeout = 0 (this will wait forever)

This commit is contained in:
nathan 2020-09-09 15:23:20 +02:00
parent ca88b9941f
commit 594019020b
3 changed files with 60 additions and 54 deletions

View File

@ -47,15 +47,14 @@ interface RemoteObject {
var responseTimeout: Int var responseTimeout: Int
/** /**
* Sets the behavior when invoking a remote method. DEFAULT is false. * Sets the behavior when invoking a remote method. DEFAULT is false. This is not thread safe!
* *
* If true, the invoking thread will not wait for a response. The method will return immediately and the return value * If true, the invoking thread will not wait for a response. The method will return immediately and the return value
* should be ignored. * should be ignored.
* *
* If false, the invoking thread will wait (if called via suspend, then it will use coroutines) for the remote method to return or * If false, the invoking thread will wait for the remote method to return or timeout.
* timeout.
* *
* If the return value or exception needs to be retrieved, then DO NOT set async, and change the response timeout * If the return value or an exception needs to be retrieved, then DO NOT set async=true, and change the response timeout to 0 instead
*/ */
var async: Boolean var async: Boolean

View File

@ -96,9 +96,8 @@ internal class RmiClient(val isGlobal: Boolean,
// NOTE: we ALWAYS send a response from the remote end. // NOTE: we ALWAYS send a response from the remote end.
// //
// 'async' -> DO NOT WAIT // 'async' -> DO NOT WAIT
// 'timeout > 0' -> WAIT // 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> same as async (DO NOT WAIT) // 'timeout == 0' -> WAIT FOREVER
val isAsync = isAsync || timeoutMillis <= 0L
// If we are async, we ignore the response.... // If we are async, we ignore the response....
@ -112,7 +111,11 @@ internal class RmiClient(val isGlobal: Boolean,
connection.send(invokeMethod) connection.send(invokeMethod)
// if we are async, then this will immediately return // if we are async, then this will immediately return
return responseManager.waitForReply(isAsync, rmiWaiter, timeoutMillis) return if (isAsync) {
null
} else {
responseManager.waitForReply(rmiWaiter, timeoutMillis)
}
} }
private fun returnAsyncOrSync(method: Method, returnValue: Any?): Any? { private fun returnAsyncOrSync(method: Method, returnValue: Any?): Any? {
@ -173,6 +176,7 @@ internal class RmiClient(val isGlobal: Boolean,
setResponseTimeoutMethod -> { setResponseTimeoutMethod -> {
timeoutMillis = (args!![0] as Int).toLong() timeoutMillis = (args!![0] as Int).toLong()
require(timeoutMillis >= 0) { "ResponseTimeout must be >= 0"}
return null return null
} }
getResponseTimeoutMethod -> { getResponseTimeoutMethod -> {

View File

@ -133,60 +133,63 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
/** /**
* @return the result (can be null) or timeout exception * @return the result (can be null) or timeout exception
*/ */
suspend fun waitForReply(isAsync: Boolean, rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? { suspend fun waitForReply(rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? {
if (isAsync) {
return null
}
@Suppress("EXPERIMENTAL_API_USAGE")
val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
// NOTE: we ALWAYS send a response from the remote end.
//
// 'async' -> DO NOT WAIT (and no response)
// 'timeout > 0' -> WAIT
// 'timeout <= 0' -> same as async (DO NOT WAIT)
@Suppress("EXPERIMENTAL_API_USAGE")
val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point)
// we want this behavior INSTEAD OF automatically starting this on a new thread.
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read {
val prev = pending[rmiId]
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
if (prev!!.result == null) {
prev
} else {
null
}
}
if (maybeResult != null) {
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
maybeResult.cancel()
}
}
logger.trace { logger.trace {
"RMI waiting: $rmiId" "RMI waiting: $rmiId"
} }
// wait for the response. // NOTE: we ALWAYS send a response from the remote end.
// //
// If the response is ALREADY here, the doWait() returns instantly (with result) // 'async' -> DO NOT WAIT
// if no response yet, it will suspend and either // 'timeout > 0' -> WAIT w/ TIMEOUT
// A) get response // 'timeout == 0' -> WAIT FOREVER
// B) timeout if (timeoutMillis > 0) {
rmiWaiter.doWait() @Suppress("EXPERIMENTAL_API_USAGE")
val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point)
// we want this behavior INSTEAD OF automatically starting this on a new thread.
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read {
val prev = pending[rmiId]
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
if (prev!!.result == null) {
prev
} else {
null
}
}
if (maybeResult != null) {
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
maybeResult.cancel()
}
}
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and either
// A) get response
// B) timeout
rmiWaiter.doWait()
// always cancel the timeout
responseTimeoutJob.cancel()
} else {
// wait for the response --- THIS WAITS FOREVER!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and
// A) get response
rmiWaiter.doWait()
}
// always cancel the timeout
responseTimeoutJob.cancel()
val pendingResult = pendingLock.write { val pendingResult = pendingLock.write {
// inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile // inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile