Fixed issues when RMI timeout happens

This commit is contained in:
nathan 2020-09-10 02:05:14 +02:00
parent c9f74c162d
commit 030387c378

View File

@ -50,7 +50,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
private val rmiWaiterCache = Channel<RmiWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock()
private val pending = arrayOfNulls<RmiWaiter>(maxValuesInCache)
private val pending = arrayOfNulls<Any?>(maxValuesInCache)
init {
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
@ -87,22 +87,19 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
logger.trace { "RMI return: $rmiId" }
val prev = pendingLock.write {
val previous = pendingLock.write {
val previous = pending[rmiId]
if (previous != null) {
// if NULL, since either we don't exist (because we were async), or it was cancelled
logger.trace { "RMI valid-cancel: $rmiId" }
// this means we were NOT timed out!
previous.result = result
previous
} else {
null
}
pending[rmiId] = result
previous
}
// if NULL, since either we don't exist (because we were async), or it was cancelled
prev?.doNotify()
if (previous is RmiWaiter) {
logger.trace { "RMI valid-cancel: $rmiId" }
// this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify()
}
}
/**
@ -153,18 +150,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read {
val prev = pending[rmiId]
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
if (prev!!.result == null) {
prev
} else {
null
}
}
if (maybeResult != null) {
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
val maybeResult = pendingLock.read { pending[rmiId] }
if (maybeResult is RmiWaiter) {
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
maybeResult.cancel()
@ -182,7 +169,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// always cancel the timeout
responseTimeoutJob.cancel()
} else {
// wait for the response --- THIS WAITS FOREVER!
// wait for the response --- THIS WAITS FOREVER (there is no timeout)!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and
@ -191,28 +178,23 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
}
val pendingResult = pendingLock.write {
// inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile
val resultOrWaiter = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = null
// previous cannot be null, because we are the only thing that removes it!
val prevResult = previous!!.result
previous.result = null
prevResult
previous
}
// always return the waiter to the cache
rmiWaiterCache.send(rmiWaiter)
rmiWaitersInUse.getAndDecrement()
if (pendingResult == null) {
if (resultOrWaiter is RmiWaiter) {
logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" }
return TIMEOUT_EXCEPTION
}
return pendingResult
return resultOrWaiter
}
suspend fun close() {