diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index 41927f2b..fb14005f 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -115,7 +115,7 @@ internal class RmiManagerGlobal(logger: KLogger, return success as T? } - fun close() { + suspend fun close() { rmiResponseManager.close() remoteObjectCreationCallbacks.close() } diff --git a/src/dorkbox/network/rmi/RmiResponseManager.kt b/src/dorkbox/network/rmi/RmiResponseManager.kt index c6032ed1..aad473a5 100644 --- a/src/dorkbox/network/rmi/RmiResponseManager.kt +++ b/src/dorkbox/network/rmi/RmiResponseManager.kt @@ -16,7 +16,9 @@ package dorkbox.network.rmi import dorkbox.network.rmi.messages.MethodResponse +import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -44,10 +46,11 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // 0 is reserved for INVALID // 1 is reserved for ASYNC private val maxValuesInCache = 65535 + private val rmiWaitersInUse = atomic(0) private val rmiWaiterCache = Channel(maxValuesInCache) private val pendingLock = ReentrantReadWriteLock() - private val pending = arrayOfNulls(maxValuesInCache) + private val pending = arrayOfNulls(maxValuesInCache) init { // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! @@ -67,9 +70,11 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio } ids.shuffle() - // populate the array of randomly assigned ID's + waiters. - for (it in ids) { - rmiWaiterCache.offer(RmiWaiter(it)) + // populate the array of randomly assigned ID's + waiters. This can happen in a new thread + actionDispatch.launch { + for (it in ids) { + rmiWaiterCache.offer(RmiWaiter(it)) + } } } @@ -82,22 +87,22 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio logger.trace { "RMI return: $rmiId" } - val previous = pendingLock.write { + val prev = pendingLock.write { val previous = pending[rmiId] - pending[rmiId] = result - previous + if (previous != null) { + // if NULL, since either we don't exist (because we were async), or it was cancelled + logger.trace { "RMI valid-cancel: $rmiId" } + + // this means we were NOT timed out! + previous.result = result + previous + } else { + null + } } // if NULL, since either we don't exist (because we were async), or it was cancelled - if (previous is RmiWaiter) { - logger.trace { "RMI valid-cancel: $rmiId" } - - // this means we were NOT timed out! (we cannot be timed out here) - previous.doNotify() - - // since this was the FIRST one to trigger, return it to the cache. - rmiWaiterCache.send(previous) - } + prev?.doNotify() } /** @@ -110,6 +115,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio ASYNC_WAITER } else { val responseRmi = rmiWaiterCache.receive() + rmiWaitersInUse.getAndIncrement() + logger.trace { "RMI count: ${rmiWaitersInUse.value}" } // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) responseRmi.prep() @@ -141,12 +148,25 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // 'timeout <= 0' -> same as async (DO NOT WAIT) - val responseTimeoutJob = actionDispatch.launch { + @Suppress("EXPERIMENTAL_API_USAGE") + val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) { + // NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point) + // we want this behavior INSTEAD OF automatically starting this on a new thread. delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting // check if we have a result or not - val maybeResult = pendingLock.read { pending[rmiId] } - if (maybeResult is RmiWaiter) { + val maybeResult = pendingLock.read { + val prev = pending[rmiId] + // maybeResult cannot be null, because the only thing that removes it is after the job cancel! + if (prev!!.result == null) { + prev + } else { + null + } + } + + if (maybeResult != null) { + // maybeResult cannot be null, because the only thing that removes it is after the job cancel! logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } maybeResult.cancel() @@ -165,32 +185,45 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio // B) timeout rmiWaiter.doWait() - val resultOrWaiter = pendingLock.write { - val previous = pending[rmiId] - pending[rmiId] = null - previous - } - // always cancel the timeout responseTimeoutJob.cancel() - if (resultOrWaiter is RmiWaiter) { + val pendingResult = pendingLock.write { + // inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile + val previous = pending[rmiId] + pending[rmiId] = null + + // previous cannot be null, because we are the only thing that removes it! + val prevResult = previous!!.result + previous.result = null + prevResult + } + + // always return the waiter to the cache + rmiWaiterCache.send(rmiWaiter) + rmiWaitersInUse.getAndDecrement() + + if (pendingResult == null) { logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" } - // since this was the FIRST one to trigger, return it to the cache. - rmiWaiterCache.send(resultOrWaiter) return TIMEOUT_EXCEPTION } - return resultOrWaiter + return pendingResult } - fun close() { + suspend fun close() { + // wait for responses, or wait for timeouts! + while (rmiWaitersInUse.value > 0) { + delay(100) + } + + rmiWaiterCache.close() + pendingLock.write { pending.forEachIndexed { index, _ -> pending[index] = null } } - rmiWaiterCache.close() } } diff --git a/src/dorkbox/network/rmi/RmiWaiter.kt b/src/dorkbox/network/rmi/RmiWaiter.kt index 4db04a7b..8c4cd798 100644 --- a/src/dorkbox/network/rmi/RmiWaiter.kt +++ b/src/dorkbox/network/rmi/RmiWaiter.kt @@ -30,6 +30,9 @@ internal data class RmiWaiter(val id: Int) { var channel: Channel = Channel(0) var isCancelled = false + // holds the RMI result. This is ALWAYS accessed from within a lock! + var result: Any? = null + /** * this will replace the waiter if it was cancelled (waiters are not valid if cancelled)