Cleaned API and logging

This commit is contained in:
Robinson 2023-02-27 11:09:33 +01:00
parent e38ff52c08
commit 340dc4a36c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,7 +22,9 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KLogger import mu.KLogger
import mu.KotlinLogging
import java.util.concurrent.locks.* import java.util.concurrent.locks.*
import kotlin.concurrent.read import kotlin.concurrent.read
import kotlin.concurrent.write import kotlin.concurrent.write
@ -32,9 +34,10 @@ import kotlin.concurrent.write
* *
* response ID's and the memory they hold will leak if the response never arrives! * response ID's and the memory they hold will leak if the response never arrives!
*/ */
internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) { internal class ResponseManager() {
companion object { companion object {
val TIMEOUT_EXCEPTION = Exception() val TIMEOUT_EXCEPTION = Exception()
private val logger: KLogger = KotlinLogging.logger(ResponseManager::class.java.simpleName)
} }
@ -55,22 +58,16 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
val ids = mutableListOf<Int>() val ids = mutableListOf<Int>()
// MIN (32768) -> -1 (65535)
// ZERO is special, and is never added! // ZERO is special, and is never added!
// ONE is special, and is used for ASYNC (the response will never be sent back) // ONE is special, and is used for ASYNC (the response will never be sent back)
// 2 (2) -> MAX (32767)
for (id in 2..maxValuesInCache) {
for (id in Short.MIN_VALUE..-1) {
ids.add(id)
}
for (id in 2..Short.MAX_VALUE) {
ids.add(id) ids.add(id)
} }
ids.shuffle() ids.shuffle()
// populate the array of randomly assigned ID's + waiters. It is OK for this to happen in a new thread // populate the array of randomly assigned ID's + waiters. It is OK for this to happen in a new thread
actionDispatch.launch { runBlocking {
try { try {
for (it in ids) { for (it in ids) {
waiterCache.send(ResponseWaiter(it)) waiterCache.send(ResponseWaiter(it))
@ -88,18 +85,18 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
* resume any pending remote object method invocations (if they are not async, or not manually waiting) * resume any pending remote object method invocations (if they are not async, or not manually waiting)
* NOTE: async RMI will never call this (because async doesn't return a response) * NOTE: async RMI will never call this (because async doesn't return a response)
*/ */
suspend fun notifyWaiter(rmiId: Int, result: Any?, logger: KLogger) { suspend fun notifyWaiter(id: Int, result: Any?, logger: KLogger) {
logger.trace { "RMI return message: $rmiId" } logger.trace { "[RM] notify: $id" }
val previous = pendingLock.write { val previous = pendingLock.write {
val previous = pending[rmiId] val previous = pending[id]
pending[rmiId] = result pending[id] = result
previous previous
} }
// if NULL, since either we don't exist (because we were async), or it was cancelled // if NULL, since either we don't exist (because we were async), or it was cancelled
if (previous is ResponseWaiter) { if (previous is ResponseWaiter) {
logger.trace { "RMI valid-cancel onMessage: $rmiId" } logger.trace { "[RM] valid-cancel: $id" }
// this means we were NOT timed out! (we cannot be timed out here) // this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify() previous.doNotify()
@ -111,12 +108,12 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
* *
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different) * This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
*/ */
suspend fun <T> getWaiterCallback(rmiId: Int, logger: KLogger): T? { suspend fun <T> getWaiterCallback(id: Int, logger: KLogger): T? {
logger.trace { "RMI return message: $rmiId" } logger.trace { "[RM] get-callback: $id" }
val previous = pendingLock.write { val previous = pendingLock.write {
val previous = pending[rmiId] val previous = pending[id]
pending[rmiId] = null pending[id] = null
previous previous
} }
@ -140,21 +137,18 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/ */
suspend fun prep(logger: KLogger): ResponseWaiter { suspend fun prep(logger: KLogger): ResponseWaiter {
val responseRmi = waiterCache.receive() val waiter = waiterCache.receive()
rmiWaitersInUse.getAndIncrement() rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" } logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled) // this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep() waiter.prep()
val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id)
pendingLock.write { pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually pending[waiter.id] = waiter
pending[rmiId] = responseRmi
} }
return responseRmi return waiter
} }
/** /**
@ -162,39 +156,39 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
* *
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/ */
suspend fun prepWithCallback(function: Any, logger: KLogger): Int { suspend fun prepWithCallback(logger: KLogger, function: Any): Int {
val responseRmi = waiterCache.receive() val waiter = waiterCache.receive()
rmiWaitersInUse.getAndIncrement() rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" } logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled) // this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep() waiter.prep()
// assign the callback that will be notified when the return message is received // assign the callback that will be notified when the return message is received
responseRmi.result = function waiter.result = function
val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id) val id = RmiUtils.unpackUnsignedRight(waiter.id)
pendingLock.write { pendingLock.write {
pending[rmiId] = responseRmi pending[id] = waiter
} }
return rmiId return id
} }
/** /**
* Cancels the RMI request in the given timeout, the callback is executed inside the read lock * Cancels the RMI request in the given timeout, the callback is executed inside the read lock
*/ */
fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) { fun cancelRequest(eventDispatch: CoroutineScope, timeoutMillis: Long, id: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) {
actionDispatch.launch { eventDispatch.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not // check if we have a result or not
pendingLock.read { pendingLock.read {
val maybeResult = pending[rmiId] val maybeResult = pending[id]
if (maybeResult is ResponseWaiter) { if (maybeResult is ResponseWaiter) {
logger.trace { "RMI timeout ($timeoutMillis) with callback cancel: $rmiId" } logger.trace { "[RM] timeout ($timeoutMillis) with callback cancel: $id" }
maybeResult.cancel() maybeResult.cancel()
onCancelled(maybeResult) onCancelled(maybeResult)
@ -211,17 +205,15 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
* @return the result (can be null) or timeout exception * @return the result (can be null) or timeout exception
*/ */
suspend fun waitForReply( suspend fun waitForReply(
actionDispatch: CoroutineScope, eventDispatch: CoroutineScope,
responseWaiter: ResponseWaiter, responseWaiter: ResponseWaiter,
timeoutMillis: Long, timeoutMillis: Long,
logger: KLogger, logger: KLogger,
connection: Connection connection: Connection
): Any? { ): Any? {
val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) val id = RmiUtils.unpackUnsignedRight(responseWaiter.id)
logger.trace { logger.trace { "[RM] wait: $id" }
"RMI waiting: $rmiId"
}
// NOTE: we ALWAYS send a response from the remote end (except when async). // NOTE: we ALWAYS send a response from the remote end (except when async).
// //
@ -229,13 +221,13 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
// 'timeout > 0' -> WAIT w/ TIMEOUT // 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER // 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) { if (timeoutMillis > 0) {
val responseTimeoutJob = actionDispatch.launch { val responseTimeoutJob = eventDispatch.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not // check if we have a result or not
val maybeResult = pendingLock.read { pending[rmiId] } val maybeResult = pendingLock.read { pending[id] }
if (maybeResult is ResponseWaiter) { if (maybeResult is ResponseWaiter) {
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" } logger.trace { "[RM] timeout ($timeoutMillis) cancel: $id" }
maybeResult.cancel() maybeResult.cancel()
} }
@ -263,8 +255,8 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
// deletes the entry in the map // deletes the entry in the map
val resultOrWaiter = pendingLock.write { val resultOrWaiter = pendingLock.write {
val previous = pending[rmiId] val previous = pending[id]
pending[rmiId] = null pending[id] = null
previous previous
} }
@ -273,7 +265,7 @@ internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope)
rmiWaitersInUse.getAndDecrement() rmiWaitersInUse.getAndDecrement()
if (resultOrWaiter is ResponseWaiter) { if (resultOrWaiter is ResponseWaiter) {
logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" } logger.trace { "[RM] timeout cancel ($timeoutMillis): $id" }
return if (connection.isClosed() || connection.isClosedViaAeron()) { return if (connection.isClosed() || connection.isClosedViaAeron()) {
null null