Converted the RMI response manager to use blocking instead of suspending calls.

This commit is contained in:
Robinson 2023-09-05 12:59:49 +02:00
parent 3e9a8f9c74
commit 7c326d180c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 41 additions and 87 deletions

View File

@ -49,14 +49,6 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
private val logger: KLogger = KotlinLogging.logger(ResponseManager::class.java.simpleName)
}
private val executor = Executors.newSingleThreadExecutor(
NamedThreadFactory("ResponseManager",
Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true)
)
private val scope = CoroutineScope(executor.asCoroutineDispatcher() + SupervisorJob())
private val rmiWaitersInUse = atomic(0)
private val waiterCache: Pool<ResponseWaiter>
@ -99,12 +91,10 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
// if NULL, since either we don't exist (because we were async), or it was cancelled
if (previous is ResponseWaiter) {
logger.trace { "[RM] valid-cancel: $id" }
logger.trace { "[RM] valid-notify: $id" }
// this means we were NOT timed out! (we cannot be timed out here)
runBlocking {
previous.doNotify()
}
previous.doNotify()
}
}
@ -113,7 +103,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
*
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
*/
fun <T> getWaiterCallback(id: Int, logger: KLogger): T? {
fun <T> removeWaiterCallback(id: Int, logger: KLogger): T? {
logger.trace { "[RM] get-callback: $id" }
val previous = pendingLock.write {
@ -127,6 +117,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
val result = previous.result
// always return this to the cache!
previous.result = null
waiterCache.put(previous)
rmiWaitersInUse.getAndDecrement()
@ -146,7 +137,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
rmiWaitersInUse.getAndIncrement()
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
// this will initialize the result
waiter.prep()
pendingLock.write {
@ -166,7 +157,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
rmiWaitersInUse.getAndIncrement()
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
// this will initialize the result
waiter.prep()
// assign the callback that will be notified when the return message is received
@ -182,26 +173,6 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
}
/**
* Cancels the RMI request in the given timeout, the callback is executed inside the read lock
*/
fun cancelRequest(timeoutMillis: Long, id: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) {
scope.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
pendingLock.read {
val maybeResult = pending[id]
if (maybeResult is ResponseWaiter) {
logger.trace { "[RM] timeout ($timeoutMillis) with callback cancel: $id" }
maybeResult.cancel()
onCancelled(maybeResult)
}
}
}
}
/**
* We only wait for a reply if we are SYNC.
*
@ -225,39 +196,23 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) {
val responseTimeoutJob = scope.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read { pending[id] }
if (maybeResult is ResponseWaiter) {
logger.trace { "[RM] timeout ($timeoutMillis) cancel: $id" }
maybeResult.cancel()
}
}
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and either
// if no response yet, it will wait for:
// A) get response
// B) timeout
runBlocking {
responseWaiter.doWait()
}
responseWaiter.doWait(timeoutMillis)
// always cancel the timeout
responseTimeoutJob.cancel()
// if we timeout, it doesn't matter since we'll be removing the waiter from the array anyways,
// so no signal can occur, or a signal won't matter
} else {
// wait for the response --- THIS WAITS FOREVER (there is no timeout)!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and
// if no response yet, it will wait for one
// A) get response
runBlocking {
responseWaiter.doWait()
}
responseWaiter.doWait()
}
@ -269,6 +224,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
}
// always return the waiter to the cache
responseWaiter.result = null
waiterCache.put(responseWaiter)
rmiWaitersInUse.getAndDecrement()
@ -299,8 +255,5 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
pending[index] = null
}
}
scope.cancel("Closing the response manager for RMI")
executor.awaitTermination(500, TimeUnit.MILLISECONDS)
}
}

View File

@ -15,56 +15,57 @@
*/
package dorkbox.network.rmi
import kotlinx.coroutines.channels.Channel
import kotlinx.atomicfu.locks.withLock
import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
data class ResponseWaiter(val id: Int) {
// this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting
// https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified
// https://kotlinlang.org/docs/reference/coroutines/channels.html
// "receive' suspends until another coroutine invokes "send"
// and
// "send" suspends until another coroutine invokes "receive".
//
// these are wrapped in a try/catch, because cancel will cause exceptions to be thrown (which we DO NOT want)
@Volatile
var channel: Channel<Unit> = Channel(Channel.RENDEZVOUS)
@Volatile
var isCancelled = false
private val lock = ReentrantLock()
private val condition = lock.newCondition()
// holds the RMI result or callback. This is ALWAYS accessed from within a lock (so no synchronize/volatile/etc necessary)!
@Volatile
var result: Any? = null
/**
* this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
* this will set the result to null
*/
fun prep() {
if (isCancelled) {
isCancelled = false
channel = Channel(0)
}
result = null
}
suspend fun doNotify() {
/**
* Waits until another thread invokes "doWait"
*/
fun doNotify() {
try {
channel.send(Unit)
lock.withLock {
condition.signal()
}
} catch (ignored: Throwable) {
}
}
suspend fun doWait() {
/**
* Waits a specific amount of time until another thread invokes "doNotify"
*/
fun doWait() {
try {
channel.receive()
lock.withLock {
condition.await()
}
} catch (ignored: Throwable) {
}
}
fun cancel() {
/**
* Waits a specific amount of time until another thread invokes "doNotify"
*/
fun doWait(timeout: Long) {
try {
isCancelled = true
channel.cancel()
lock.withLock {
condition.await(timeout, TimeUnit.MILLISECONDS)
}
} catch (ignored: Throwable) {
}
}