diff --git a/src/dorkbox/network/aeron/EventPoller.kt b/src/dorkbox/network/aeron/EventPoller.kt index fd136c72..59775a74 100644 --- a/src/dorkbox/network/aeron/EventPoller.kt +++ b/src/dorkbox/network/aeron/EventPoller.kt @@ -18,6 +18,12 @@ package dorkbox.network.aeron import dorkbox.collections.ConcurrentIterator import dorkbox.network.Configuration +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import mu.KLogger import mu.KotlinLogging import org.agrona.concurrent.IdleStrategy @@ -34,7 +40,9 @@ class EventPoller: Closeable { private val logger: KLogger = KotlinLogging.logger(EventPoller::class.java.simpleName) private var configured = false - private lateinit var dispatch: ExecutorService + private lateinit var pollDispatcher: CoroutineDispatcher + private lateinit var dispatchScope: CoroutineScope + private lateinit var pollStrategy: CoroutineIdleStrategy private lateinit var clonedStrategy: IdleStrategy @@ -42,7 +50,7 @@ class EventPoller: Closeable { private var running = true // this is thread safe - private val pollEvents = ConcurrentIteratorInt, ()->Unit>>() + private val pollEvents = ConcurrentIteratorInt, suspend ()->Unit>>() @Volatile private var shutdownLatch = CountDownLatch(1) @@ -53,13 +61,14 @@ class EventPoller: Closeable { running = true configured = true shutdownLatch = CountDownLatch(1) - dispatch = config.networkEventPoll + pollDispatcher = config.networkEventPoll pollStrategy = config.pollIdleStrategy clonedStrategy = config.pollIdleStrategy.cloneToNormal() - require(!dispatch.isTerminated) { "Unable to start the event dispatch in the terminated state!"} + dispatchScope = CoroutineScope(pollDispatcher) + require(!pollDispatcher.isActive) { "Unable to start the event dispatch in the terminated state!"} - dispatch.submit { + dispatchScope.launch { val pollIdleStrategy = clonedStrategy var pollCount = 0 @@ -96,7 +105,7 @@ class EventPoller: Closeable { shutdownLatch.countDown() } } else { - require(dispatch == config.networkEventPoll) { + require(pollDispatcher == config.networkEventPoll) { "The network event dispatcher is different between the multiple instances of network clients/servers. There **WILL BE** thread starvation, so this behavior is forbidden!" } @@ -109,7 +118,7 @@ class EventPoller: Closeable { /** * Will cause the executing thread to wait until the event has been started */ - fun submit(action: EventPoller.() -> Int, onShutdown: ()->Unit = {}) { + fun submit(action: suspend EventPoller.() -> Int, onShutdown: suspend ()->Unit = {}) { // this forces the current thread to WAIT until the network poll system has started val pollStartupLatch = CountDownLatch(1) @@ -135,9 +144,15 @@ class EventPoller: Closeable { // ONLY if there are no more poll-events do we ACTUALLY shut down. // when an endpoint closes its polling, it will automatically be removed from this datastructure. if (pollEvents.size() == 0) { + logger.error { "Closing the Network Event Poller..." } running = false shutdownLatch.await() configured = false + + runBlocking { + dispatchScope.cancel("Closed event dispatch") + logger.error { "Closed the Network Event Poller..." } + } } } }