made event access more east to understand

This commit is contained in:
Robinson 2023-07-03 14:58:29 +02:00
parent 7b7910d078
commit a4e2e714c4
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -42,6 +42,8 @@ internal class EventPoller {
internal const val REMOVE = -1 internal const val REMOVE = -1
val eventLogger = KotlinLogging.logger(EventPoller::class.java.simpleName) val eventLogger = KotlinLogging.logger(EventPoller::class.java.simpleName)
private class EventAction(val onAction: suspend ()->Int, val onClose: suspend ()->Unit)
private val pollDispatcher = Executors.newSingleThreadExecutor( private val pollDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory("Poll Dispatcher", Configuration.networkThreadGroup, true) NamedThreadFactory("Poll Dispatcher", Configuration.networkThreadGroup, true)
).asCoroutineDispatcher() ).asCoroutineDispatcher()
@ -57,7 +59,7 @@ internal class EventPoller {
private var mutex = Mutex() private var mutex = Mutex()
// this is thread safe // this is thread safe
private val pollEvents = ConcurrentIterator<Pair<suspend EventPoller.()->Int, suspend ()->Unit>>() private val pollEvents = ConcurrentIterator<EventAction>()
private val submitEvents = atomic(0) private val submitEvents = atomic(0)
private val configureEventsEndpoints = mutableSetOf<ByteArrayWrapper>() private val configureEventsEndpoints = mutableSetOf<ByteArrayWrapper>()
@ -105,7 +107,7 @@ internal class EventPoller {
try { try {
// check to see if we should remove this event (when a client/server closes, it is removed) // check to see if we should remove this event (when a client/server closes, it is removed)
// once ALL endpoint are closed, this is shutdown. // once ALL endpoint are closed, this is shutdown.
val poll = it.first(this@EventPoller) val poll = it.onAction()
// <0 means we remove the event from processing // <0 means we remove the event from processing
// 0 means we idle // 0 means we idle
@ -113,7 +115,7 @@ internal class EventPoller {
if (poll < 0) { if (poll < 0) {
// remove our event, it is no longer valid // remove our event, it is no longer valid
pollEvents.remove(this) pollEvents.remove(this)
it.second() // shutting down it.onClose() // shutting down
// check to see if we requested a shutdown // check to see if we requested a shutdown
if (delayClose) { if (delayClose) {
@ -127,7 +129,7 @@ internal class EventPoller {
// remove our event, it is no longer valid // remove our event, it is no longer valid
pollEvents.remove(this) pollEvents.remove(this)
it.second() // shutting down it.onClose() // shutting down
// check to see if we requested a shutdown // check to see if we requested a shutdown
if (delayClose) { if (delayClose) {
@ -143,7 +145,7 @@ internal class EventPoller {
pollEvents.forEachRemovable { pollEvents.forEachRemovable {
// remove our event, it is no longer valid // remove our event, it is no longer valid
pollEvents.remove(this) pollEvents.remove(this)
it.second() // shutting down it.onClose() // shutting down
} }
shutdownLatch.countDown() shutdownLatch.countDown()
@ -159,14 +161,14 @@ internal class EventPoller {
/** /**
* Will cause the executing thread to wait until the event has been started * Will cause the executing thread to wait until the event has been started
*/ */
suspend fun submit(action: suspend EventPoller.() -> Int, onShutdown: suspend () -> Unit) = mutex.withLock { suspend fun submit(action: suspend () -> Int, onShutdown: suspend () -> Unit) = mutex.withLock {
submitEvents.getAndIncrement() submitEvents.getAndIncrement()
// this forces the current thread to WAIT until the network poll system has started // this forces the current thread to WAIT until the network poll system has started
val pollStartupLatch = CountDownLatch(1) val pollStartupLatch = CountDownLatch(1)
pollEvents.add(Pair(action, onShutdown)) pollEvents.add(EventAction(action, onShutdown))
pollEvents.add(Pair( pollEvents.add(EventAction(
{ {
pollStartupLatch.countDown() pollStartupLatch.countDown()