Updated aeron event poller

This commit is contained in:
Robinson 2023-03-02 19:40:42 +01:00
parent fc30c16758
commit 7ec23c59fa
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -18,6 +18,12 @@ package dorkbox.network.aeron
import dorkbox.collections.ConcurrentIterator
import dorkbox.network.Configuration
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.concurrent.IdleStrategy
@ -34,7 +40,9 @@ class EventPoller: Closeable {
private val logger: KLogger = KotlinLogging.logger(EventPoller::class.java.simpleName)
private var configured = false
private lateinit var dispatch: ExecutorService
private lateinit var pollDispatcher: CoroutineDispatcher
private lateinit var dispatchScope: CoroutineScope
private lateinit var pollStrategy: CoroutineIdleStrategy
private lateinit var clonedStrategy: IdleStrategy
@ -42,7 +50,7 @@ class EventPoller: Closeable {
private var running = true
// this is thread safe
private val pollEvents = ConcurrentIterator<Pair<EventPoller.()->Int, ()->Unit>>()
private val pollEvents = ConcurrentIterator<Pair<suspend EventPoller.()->Int, suspend ()->Unit>>()
@Volatile
private var shutdownLatch = CountDownLatch(1)
@ -53,13 +61,14 @@ class EventPoller: Closeable {
running = true
configured = true
shutdownLatch = CountDownLatch(1)
dispatch = config.networkEventPoll
pollDispatcher = config.networkEventPoll
pollStrategy = config.pollIdleStrategy
clonedStrategy = config.pollIdleStrategy.cloneToNormal()
require(!dispatch.isTerminated) { "Unable to start the event dispatch in the terminated state!"}
dispatchScope = CoroutineScope(pollDispatcher)
require(!pollDispatcher.isActive) { "Unable to start the event dispatch in the terminated state!"}
dispatch.submit {
dispatchScope.launch {
val pollIdleStrategy = clonedStrategy
var pollCount = 0
@ -96,7 +105,7 @@ class EventPoller: Closeable {
shutdownLatch.countDown()
}
} else {
require(dispatch == config.networkEventPoll) {
require(pollDispatcher == config.networkEventPoll) {
"The network event dispatcher is different between the multiple instances of network clients/servers. There **WILL BE** thread starvation, so this behavior is forbidden!"
}
@ -109,7 +118,7 @@ class EventPoller: Closeable {
/**
* Will cause the executing thread to wait until the event has been started
*/
fun submit(action: EventPoller.() -> Int, onShutdown: ()->Unit = {}) {
fun submit(action: suspend EventPoller.() -> Int, onShutdown: suspend ()->Unit = {}) {
// this forces the current thread to WAIT until the network poll system has started
val pollStartupLatch = CountDownLatch(1)
@ -135,9 +144,15 @@ class EventPoller: Closeable {
// ONLY if there are no more poll-events do we ACTUALLY shut down.
// when an endpoint closes its polling, it will automatically be removed from this datastructure.
if (pollEvents.size() == 0) {
logger.error { "Closing the Network Event Poller..." }
running = false
shutdownLatch.await()
configured = false
runBlocking {
dispatchScope.cancel("Closed event dispatch")
logger.error { "Closed the Network Event Poller..." }
}
}
}
}