From 340dc4a36cb22d8b9f346a56c27c5066faf9382d Mon Sep 17 00:00:00 2001 From: Robinson Date: Mon, 27 Feb 2023 11:09:33 +0100 Subject: [PATCH] Cleaned API and logging --- src/dorkbox/network/rmi/ResponseManager.kt | 92 ++++++++++------------ 1 file changed, 42 insertions(+), 50 deletions(-) diff --git a/src/dorkbox/network/rmi/ResponseManager.kt b/src/dorkbox/network/rmi/ResponseManager.kt index 2de4109a..95dece38 100644 --- a/src/dorkbox/network/rmi/ResponseManager.kt +++ b/src/dorkbox/network/rmi/ResponseManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import mu.KLogger +import mu.KotlinLogging import java.util.concurrent.locks.* import kotlin.concurrent.read import kotlin.concurrent.write @@ -32,9 +34,10 @@ import kotlin.concurrent.write * * response ID's and the memory they hold will leak if the response never arrives! */ -internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) { +internal class ResponseManager() { companion object { val TIMEOUT_EXCEPTION = Exception() + private val logger: KLogger = KotlinLogging.logger(ResponseManager::class.java.simpleName) } @@ -55,22 +58,16 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! val ids = mutableListOf() - // MIN (32768) -> -1 (65535) // ZERO is special, and is never added! // ONE is special, and is used for ASYNC (the response will never be sent back) - // 2 (2) -> MAX (32767) - - for (id in Short.MIN_VALUE..-1) { - ids.add(id) - } - for (id in 2..Short.MAX_VALUE) { + for (id in 2..maxValuesInCache) { ids.add(id) } ids.shuffle() // populate the array of randomly assigned ID's + waiters. It is OK for this to happen in a new thread - actionDispatch.launch { + runBlocking { try { for (it in ids) { waiterCache.send(ResponseWaiter(it)) @@ -88,18 +85,18 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) * resume any pending remote object method invocations (if they are not async, or not manually waiting) * NOTE: async RMI will never call this (because async doesn't return a response) */ - suspend fun notifyWaiter(rmiId: Int, result: Any?, logger: KLogger) { - logger.trace { "RMI return message: $rmiId" } + suspend fun notifyWaiter(id: Int, result: Any?, logger: KLogger) { + logger.trace { "[RM] notify: $id" } val previous = pendingLock.write { - val previous = pending[rmiId] - pending[rmiId] = result + val previous = pending[id] + pending[id] = result previous } // if NULL, since either we don't exist (because we were async), or it was cancelled if (previous is ResponseWaiter) { - logger.trace { "RMI valid-cancel onMessage: $rmiId" } + logger.trace { "[RM] valid-cancel: $id" } // this means we were NOT timed out! (we cannot be timed out here) previous.doNotify() @@ -111,12 +108,12 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) * * This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different) */ - suspend fun getWaiterCallback(rmiId: Int, logger: KLogger): T? { - logger.trace { "RMI return message: $rmiId" } + suspend fun getWaiterCallback(id: Int, logger: KLogger): T? { + logger.trace { "[RM] get-callback: $id" } val previous = pendingLock.write { - val previous = pending[rmiId] - pending[rmiId] = null + val previous = pending[id] + pending[id] = null previous } @@ -140,21 +137,18 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ suspend fun prep(logger: KLogger): ResponseWaiter { - val responseRmi = waiterCache.receive() + val waiter = waiterCache.receive() rmiWaitersInUse.getAndIncrement() - logger.trace { "RMI count: ${rmiWaitersInUse.value}" } + logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" } // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) - responseRmi.prep() - - val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id) + waiter.prep() pendingLock.write { - // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually - pending[rmiId] = responseRmi + pending[waiter.id] = waiter } - return responseRmi + return waiter } /** @@ -162,39 +156,39 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) * * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ - suspend fun prepWithCallback(function: Any, logger: KLogger): Int { - val responseRmi = waiterCache.receive() + suspend fun prepWithCallback(logger: KLogger, function: Any): Int { + val waiter = waiterCache.receive() rmiWaitersInUse.getAndIncrement() - logger.trace { "RMI count: ${rmiWaitersInUse.value}" } + logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" } // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) - responseRmi.prep() + waiter.prep() // assign the callback that will be notified when the return message is received - responseRmi.result = function + waiter.result = function - val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id) + val id = RmiUtils.unpackUnsignedRight(waiter.id) pendingLock.write { - pending[rmiId] = responseRmi + pending[id] = waiter } - return rmiId + return id } /** * Cancels the RMI request in the given timeout, the callback is executed inside the read lock */ - fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) { - actionDispatch.launch { + fun cancelRequest(eventDispatch: CoroutineScope, timeoutMillis: Long, id: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) { + eventDispatch.launch { delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting // check if we have a result or not pendingLock.read { - val maybeResult = pending[rmiId] + val maybeResult = pending[id] if (maybeResult is ResponseWaiter) { - logger.trace { "RMI timeout ($timeoutMillis) with callback cancel: $rmiId" } + logger.trace { "[RM] timeout ($timeoutMillis) with callback cancel: $id" } maybeResult.cancel() onCancelled(maybeResult) @@ -211,17 +205,15 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) * @return the result (can be null) or timeout exception */ suspend fun waitForReply( - actionDispatch: CoroutineScope, + eventDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long, logger: KLogger, connection: Connection ): Any? { - val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) + val id = RmiUtils.unpackUnsignedRight(responseWaiter.id) - logger.trace { - "RMI waiting: $rmiId" - } + logger.trace { "[RM] wait: $id" } // NOTE: we ALWAYS send a response from the remote end (except when async). // @@ -229,13 +221,13 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) // 'timeout > 0' -> WAIT w/ TIMEOUT // 'timeout == 0' -> WAIT FOREVER if (timeoutMillis > 0) { - val responseTimeoutJob = actionDispatch.launch { + val responseTimeoutJob = eventDispatch.launch { delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting // check if we have a result or not - val maybeResult = pendingLock.read { pending[rmiId] } + val maybeResult = pendingLock.read { pending[id] } if (maybeResult is ResponseWaiter) { - logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } + logger.trace { "[RM] timeout ($timeoutMillis) cancel: $id" } maybeResult.cancel() } @@ -263,8 +255,8 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) // deletes the entry in the map val resultOrWaiter = pendingLock.write { - val previous = pending[rmiId] - pending[rmiId] = null + val previous = pending[id] + pending[id] = null previous } @@ -273,7 +265,7 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) rmiWaitersInUse.getAndDecrement() if (resultOrWaiter is ResponseWaiter) { - logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" } + logger.trace { "[RM] timeout cancel ($timeoutMillis): $id" } return if (connection.isClosed() || connection.isClosedViaAeron()) { null