Async allocation and branch cleanup
This commit is contained in:
parent
69681f626a
commit
e5c35ea24e
|
@ -35,7 +35,6 @@ import kotlin.concurrent.write
|
||||||
internal class ResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
|
internal class ResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
|
||||||
companion object {
|
companion object {
|
||||||
val TIMEOUT_EXCEPTION = Exception()
|
val TIMEOUT_EXCEPTION = Exception()
|
||||||
val ASYNC_WAITER = ResponseWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -109,10 +108,7 @@ internal class ResponseManager(private val logger: KLogger, private val actionDi
|
||||||
*
|
*
|
||||||
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
|
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
|
||||||
*/
|
*/
|
||||||
internal suspend fun prep(isAsync: Boolean): ResponseWaiter {
|
internal suspend fun prep(): ResponseWaiter {
|
||||||
return if (isAsync) {
|
|
||||||
ASYNC_WAITER
|
|
||||||
} else {
|
|
||||||
val responseRmi = waiterCache.receive()
|
val responseRmi = waiterCache.receive()
|
||||||
rmiWaitersInUse.getAndIncrement()
|
rmiWaitersInUse.getAndIncrement()
|
||||||
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
|
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
|
||||||
|
@ -125,8 +121,7 @@ internal class ResponseManager(private val logger: KLogger, private val actionDi
|
||||||
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
|
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
|
||||||
}
|
}
|
||||||
|
|
||||||
responseRmi
|
return responseRmi
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -99,22 +99,23 @@ internal class RmiClient(val isGlobal: Boolean,
|
||||||
// 'timeout > 0' -> WAIT w/ TIMEOUT
|
// 'timeout > 0' -> WAIT w/ TIMEOUT
|
||||||
// 'timeout == 0' -> WAIT FOREVER
|
// 'timeout == 0' -> WAIT FOREVER
|
||||||
|
|
||||||
|
|
||||||
// If we are async, we ignore the response....
|
|
||||||
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code excution is in lockstep
|
|
||||||
val rmiWaiter = responseManager.prep(isAsync)
|
|
||||||
|
|
||||||
invokeMethod.isGlobal = isGlobal
|
invokeMethod.isGlobal = isGlobal
|
||||||
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
|
|
||||||
|
|
||||||
|
if (isAsync) {
|
||||||
|
// If we are async, we ignore the response....
|
||||||
|
|
||||||
|
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI)
|
||||||
|
|
||||||
|
connection.send(invokeMethod)
|
||||||
|
return null
|
||||||
|
} else {
|
||||||
|
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep
|
||||||
|
val rmiWaiter = responseManager.prep()
|
||||||
|
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
|
||||||
|
|
||||||
connection.send(invokeMethod)
|
connection.send(invokeMethod)
|
||||||
|
|
||||||
// if we are async, then this will immediately return
|
return responseManager.waitForReply(rmiWaiter, timeoutMillis)
|
||||||
return if (isAsync) {
|
|
||||||
null
|
|
||||||
} else {
|
|
||||||
responseManager.waitForReply(rmiWaiter, timeoutMillis)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user