Renamed RmiWaiter -> ResponseWaiter
This commit is contained in:
parent
04ca513cc3
commit
8f39813ccf
|
@ -17,7 +17,7 @@ package dorkbox.network.rmi
|
||||||
|
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
|
||||||
internal data class RmiWaiter(val id: Int) {
|
internal data class ResponseWaiter(val id: Int) {
|
||||||
// this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting
|
// this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting
|
||||||
// https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified
|
// https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified
|
||||||
// https://kotlinlang.org/docs/reference/coroutines/channels.html
|
// https://kotlinlang.org/docs/reference/coroutines/channels.html
|
|
@ -35,7 +35,7 @@ import kotlin.concurrent.write
|
||||||
internal class RmiResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
|
internal class RmiResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
|
||||||
companion object {
|
companion object {
|
||||||
val TIMEOUT_EXCEPTION = Exception()
|
val TIMEOUT_EXCEPTION = Exception()
|
||||||
val ASYNC_WAITER = RmiWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters.
|
val ASYNC_WAITER = ResponseWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
// 1 is reserved for ASYNC
|
// 1 is reserved for ASYNC
|
||||||
private val maxValuesInCache = 65535
|
private val maxValuesInCache = 65535
|
||||||
private val rmiWaitersInUse = atomic(0)
|
private val rmiWaitersInUse = atomic(0)
|
||||||
private val rmiWaiterCache = Channel<RmiWaiter>(maxValuesInCache)
|
private val rmiWaiterCache = Channel<ResponseWaiter>(maxValuesInCache)
|
||||||
|
|
||||||
private val pendingLock = ReentrantReadWriteLock()
|
private val pendingLock = ReentrantReadWriteLock()
|
||||||
private val pending = arrayOfNulls<Any?>(maxValuesInCache+1) // +1 because it's possible to have the value 65535 in the cache
|
private val pending = arrayOfNulls<Any?>(maxValuesInCache+1) // +1 because it's possible to have the value 65535 in the cache
|
||||||
|
@ -73,7 +73,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
// populate the array of randomly assigned ID's + waiters. This can happen in a new thread
|
// populate the array of randomly assigned ID's + waiters. This can happen in a new thread
|
||||||
actionDispatch.launch {
|
actionDispatch.launch {
|
||||||
for (it in ids) {
|
for (it in ids) {
|
||||||
rmiWaiterCache.offer(RmiWaiter(it))
|
rmiWaiterCache.offer(ResponseWaiter(it))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
}
|
}
|
||||||
|
|
||||||
// if NULL, since either we don't exist (because we were async), or it was cancelled
|
// if NULL, since either we don't exist (because we were async), or it was cancelled
|
||||||
if (previous is RmiWaiter) {
|
if (previous is ResponseWaiter) {
|
||||||
logger.trace { "RMI valid-cancel: $rmiId" }
|
logger.trace { "RMI valid-cancel: $rmiId" }
|
||||||
|
|
||||||
// this means we were NOT timed out! (we cannot be timed out here)
|
// this means we were NOT timed out! (we cannot be timed out here)
|
||||||
|
@ -107,7 +107,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
*
|
*
|
||||||
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
|
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
|
||||||
*/
|
*/
|
||||||
internal suspend fun prep(isAsync: Boolean): RmiWaiter {
|
internal suspend fun prep(isAsync: Boolean): ResponseWaiter {
|
||||||
return if (isAsync) {
|
return if (isAsync) {
|
||||||
ASYNC_WAITER
|
ASYNC_WAITER
|
||||||
} else {
|
} else {
|
||||||
|
@ -130,8 +130,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
/**
|
/**
|
||||||
* @return the result (can be null) or timeout exception
|
* @return the result (can be null) or timeout exception
|
||||||
*/
|
*/
|
||||||
suspend fun waitForReply(rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? {
|
suspend fun waitForReply(responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? {
|
||||||
val rmiId = RmiUtils.unpackUnsignedRight(rmiWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
|
val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
|
||||||
|
|
||||||
logger.trace {
|
logger.trace {
|
||||||
"RMI waiting: $rmiId"
|
"RMI waiting: $rmiId"
|
||||||
|
@ -151,7 +151,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
|
|
||||||
// check if we have a result or not
|
// check if we have a result or not
|
||||||
val maybeResult = pendingLock.read { pending[rmiId] }
|
val maybeResult = pendingLock.read { pending[rmiId] }
|
||||||
if (maybeResult is RmiWaiter) {
|
if (maybeResult is ResponseWaiter) {
|
||||||
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
|
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
|
||||||
|
|
||||||
maybeResult.cancel()
|
maybeResult.cancel()
|
||||||
|
@ -164,7 +164,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
// if no response yet, it will suspend and either
|
// if no response yet, it will suspend and either
|
||||||
// A) get response
|
// A) get response
|
||||||
// B) timeout
|
// B) timeout
|
||||||
rmiWaiter.doWait()
|
responseWaiter.doWait()
|
||||||
|
|
||||||
// always cancel the timeout
|
// always cancel the timeout
|
||||||
responseTimeoutJob.cancel()
|
responseTimeoutJob.cancel()
|
||||||
|
@ -174,7 +174,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
// If the response is ALREADY here, the doWait() returns instantly (with result)
|
// If the response is ALREADY here, the doWait() returns instantly (with result)
|
||||||
// if no response yet, it will suspend and
|
// if no response yet, it will suspend and
|
||||||
// A) get response
|
// A) get response
|
||||||
rmiWaiter.doWait()
|
responseWaiter.doWait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -185,10 +185,10 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
|
||||||
}
|
}
|
||||||
|
|
||||||
// always return the waiter to the cache
|
// always return the waiter to the cache
|
||||||
rmiWaiterCache.send(rmiWaiter)
|
rmiWaiterCache.send(responseWaiter)
|
||||||
rmiWaitersInUse.getAndDecrement()
|
rmiWaitersInUse.getAndDecrement()
|
||||||
|
|
||||||
if (resultOrWaiter is RmiWaiter) {
|
if (resultOrWaiter is ResponseWaiter) {
|
||||||
logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" }
|
logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" }
|
||||||
|
|
||||||
return TIMEOUT_EXCEPTION
|
return TIMEOUT_EXCEPTION
|
||||||
|
|
Loading…
Reference in New Issue
Block a user