ResponseManager uses its own, internal dispatcher for events

This commit is contained in:
Robinson 2023-09-04 00:01:27 +02:00
parent e7999d3095
commit 6291e1aa77
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 23 additions and 6 deletions

View File

@ -15,14 +15,16 @@
*/
package dorkbox.network.rmi
import dorkbox.network.Configuration
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EventDispatcher
import dorkbox.objectPool.ObjectPool
import dorkbox.objectPool.SuspendingPool
import dorkbox.util.NamedThreadFactory
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.*
import mu.KLogger
import mu.KotlinLogging
import java.util.concurrent.*
import java.util.concurrent.locks.*
import kotlin.concurrent.read
import kotlin.concurrent.write
@ -47,6 +49,14 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
private val logger: KLogger = KotlinLogging.logger(ResponseManager::class.java.simpleName)
}
private val executor = Executors.newSingleThreadExecutor(
NamedThreadFactory("ResponseManager",
Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true)
)
private val scope = CoroutineScope(executor.asCoroutineDispatcher() + SupervisorJob())
private val rmiWaitersInUse = atomic(0)
private val waiterCache: SuspendingPool<ResponseWaiter>
@ -78,7 +88,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
* resume any pending remote object method invocations (if they are not async, or not manually waiting)
* NOTE: async RMI will never call this (because async doesn't return a response)
*/
suspend fun notifyWaiter(id: Int, result: Any?, logger: KLogger) {
fun notifyWaiter(id: Int, result: Any?, logger: KLogger) {
logger.trace { "[RM] notify: $id" }
val previous = pendingLock.write {
@ -92,7 +102,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
logger.trace { "[RM] valid-cancel: $id" }
// this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify()
runBlocking {
previous.doNotify()
}
}
}
@ -174,7 +186,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
* Cancels the RMI request in the given timeout, the callback is executed inside the read lock
*/
suspend fun cancelRequest(timeoutMillis: Long, id: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) {
EventDispatcher.RESPONSE_MANAGER.launch {
scope.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
@ -213,7 +225,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) {
val responseTimeoutJob = EventDispatcher.RESPONSE_MANAGER.launch {
val responseTimeoutJob = scope.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
@ -283,5 +295,10 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
pending[index] = null
}
}
scope.cancel("Closing the response manager for RMI")
withContext(Dispatchers.IO) {
executor.awaitTermination(500, TimeUnit.MILLISECONDS)
}
}
}