diff --git a/src/dorkbox/network/rmi/RmiWaiter.kt b/src/dorkbox/network/rmi/ResponseWaiter.kt similarity index 97% rename from src/dorkbox/network/rmi/RmiWaiter.kt rename to src/dorkbox/network/rmi/ResponseWaiter.kt index 381b61f3..a3b58429 100644 --- a/src/dorkbox/network/rmi/RmiWaiter.kt +++ b/src/dorkbox/network/rmi/ResponseWaiter.kt @@ -17,7 +17,7 @@ package dorkbox.network.rmi import kotlinx.coroutines.channels.Channel -internal data class RmiWaiter(val id: Int) { +internal data class ResponseWaiter(val id: Int) { // this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting // https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified // https://kotlinlang.org/docs/reference/coroutines/channels.html diff --git a/src/dorkbox/network/rmi/RmiResponseManager.kt b/src/dorkbox/network/rmi/RmiResponseManager.kt index e877ab69..24679a2b 100644 --- a/src/dorkbox/network/rmi/RmiResponseManager.kt +++ b/src/dorkbox/network/rmi/RmiResponseManager.kt @@ -35,7 +35,7 @@ import kotlin.concurrent.write internal class RmiResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) { companion object { val TIMEOUT_EXCEPTION = Exception() - val ASYNC_WAITER = RmiWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters. + val ASYNC_WAITER = ResponseWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters. } @@ -47,7 +47,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // 1 is reserved for ASYNC private val maxValuesInCache = 65535 private val rmiWaitersInUse = atomic(0) - private val rmiWaiterCache = Channel(maxValuesInCache) + private val rmiWaiterCache = Channel(maxValuesInCache) private val pendingLock = ReentrantReadWriteLock() private val pending = arrayOfNulls(maxValuesInCache+1) // +1 because it's possible to have the value 65535 in the cache @@ -73,7 +73,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // populate the array of randomly assigned ID's + waiters. This can happen in a new thread actionDispatch.launch { for (it in ids) { - rmiWaiterCache.offer(RmiWaiter(it)) + rmiWaiterCache.offer(ResponseWaiter(it)) } } } @@ -94,7 +94,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio } // if NULL, since either we don't exist (because we were async), or it was cancelled - if (previous is RmiWaiter) { + if (previous is ResponseWaiter) { logger.trace { "RMI valid-cancel: $rmiId" } // this means we were NOT timed out! (we cannot be timed out here) @@ -107,7 +107,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio * * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ - internal suspend fun prep(isAsync: Boolean): RmiWaiter { + internal suspend fun prep(isAsync: Boolean): ResponseWaiter { return if (isAsync) { ASYNC_WAITER } else { @@ -130,8 +130,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio /** * @return the result (can be null) or timeout exception */ - suspend fun waitForReply(rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? { - val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually + suspend fun waitForReply(responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? { + val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually logger.trace { "RMI waiting: $rmiId" @@ -151,7 +151,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // check if we have a result or not val maybeResult = pendingLock.read { pending[rmiId] } - if (maybeResult is RmiWaiter) { + if (maybeResult is ResponseWaiter) { logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } maybeResult.cancel() @@ -164,7 +164,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // if no response yet, it will suspend and either // A) get response // B) timeout - rmiWaiter.doWait() + responseWaiter.doWait() // always cancel the timeout responseTimeoutJob.cancel() @@ -174,7 +174,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // If the response is ALREADY here, the doWait() returns instantly (with result) // if no response yet, it will suspend and // A) get response - rmiWaiter.doWait() + responseWaiter.doWait() } @@ -185,10 +185,10 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio } // always return the waiter to the cache - rmiWaiterCache.send(rmiWaiter) + rmiWaiterCache.send(responseWaiter) rmiWaitersInUse.getAndDecrement() - if (resultOrWaiter is RmiWaiter) { + if (resultOrWaiter is ResponseWaiter) { logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" } return TIMEOUT_EXCEPTION