From c9e71c56b10c43a778b99a597f49912995927bcb Mon Sep 17 00:00:00 2001 From: Robinson Date: Sun, 25 Jun 2023 12:01:56 +0200 Subject: [PATCH] Code polish --- src/dorkbox/network/connection/Connection.kt | 28 ++-- .../network/connection/EventDispatcher.kt | 131 +++++++++++++++--- 2 files changed, 126 insertions(+), 33 deletions(-) diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 361d07bc..d4c41531 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -21,7 +21,6 @@ import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.SerializationException import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.TransmitException -import dorkbox.network.handshake.ConnectionCounts import dorkbox.network.ping.Ping import dorkbox.network.rmi.RmiSupportConnection import dorkbox.network.rmi.messages.MethodResponse @@ -427,10 +426,19 @@ open class Connection(connectionParameters: ConnectionParams<*>) { val connection = this - // clean up the resources associated with this connection when it's closed endPoint.isServer { + // clean up the resources associated with this connection when it's closed logger.debug { "[${connection}] freeing resources" } - connection.cleanup(handshake.connectionsPerIpCounts) + sessionIdAllocator.free(info.sessionIdPub) + sessionIdAllocator.free(info.sessionIdSub) + + streamIdAllocator.free(info.streamIdPub) + streamIdAllocator.free(info.streamIdSub) + + if (remoteAddress != null) { + // unique for UDP endpoints + handshake.connectionsPerIpCounts.decrementSlow(remoteAddress) + } } logger.debug {"[$toString0] connection closed"} @@ -466,18 +474,4 @@ open class Connection(connectionParameters: ConnectionParams<*>) { val other1 = other as Connection return id == other1.id } - - // cleans up the connection information (only the server calls this!) - internal fun cleanup(connectionsPerIpCounts: ConnectionCounts) { - sessionIdAllocator.free(info.sessionIdPub) - sessionIdAllocator.free(info.sessionIdSub) - - streamIdAllocator.free(info.streamIdPub) - streamIdAllocator.free(info.streamIdSub) - - if (remoteAddress != null) { - // unique for UDP endpoints - connectionsPerIpCounts.decrementSlow(remoteAddress) - } - } } diff --git a/src/dorkbox/network/connection/EventDispatcher.kt b/src/dorkbox/network/connection/EventDispatcher.kt index e1ce7d32..973a9700 100644 --- a/src/dorkbox/network/connection/EventDispatcher.kt +++ b/src/dorkbox/network/connection/EventDispatcher.kt @@ -23,55 +23,154 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging import java.util.concurrent.* /** * This MUST be run on multiple coroutines! There are deadlock issues if it is only one. + * + * This class LITERALLY forces a coroutine dispatcher to be exclusively on a single thread. + * + * WARNING: The logic in this class will ONLY work in this class, as it relies on this specific behavior. Do not use it elsewhere! */ -class EventDispatcher { +enum class EventDispatcher { + // NOTE: CLOSE must be last! + HANDSHAKE, CONNECT, DISCONNECT, RESPONSE_MANAGER, ERROR, CLOSE; + companion object { private val DEBUG_EVENTS = false private val traceId = atomic(0) private val logger = KotlinLogging.logger(EventDispatcher::class.java.simpleName) - enum class EVENT { - HANDSHAKE, INIT, CONNECT, DISCONNECT, CLOSE, RESPONSE_MANAGER - } + private val threadIds = values().map { atomic(0L) }.toTypedArray() - private val eventData = Array(EVENT.values().size) { + + private val executors = values().map { event -> // It CANNOT be the default dispatch because there will be thread starvation // NOTE: THIS CANNOT CHANGE!! IT WILL BREAK EVERYTHING IF IT CHANGES! - val executor = Executors.newSingleThreadExecutor( - NamedThreadFactory("Event Dispatcher-${EVENT.values()[it].name}", Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true) - ).asCoroutineDispatcher() - - CoroutineScope(executor + SupervisorJob()) + Executors.newSingleThreadExecutor( + NamedThreadFactory("Event Dispatcher-${event.name}", + Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true) { thread -> + // when a new thread is created, assign it to the array + threadIds[event.ordinal].lazySet(thread.id) + } + ) } - val isActive: Boolean - get() = eventData.all { it.isActive } + private val eventData = executors.map { executor -> + CoroutineScope(executor.asCoroutineDispatcher() + SupervisorJob()) + } + + + init { + executors.forEachIndexed { _, executor -> + executor.submit { + // this is to create a new thread only, so that the thread ID can be assigned + } + } + } + + /** + * Checks if the current execution thread is running inside one of the event dispatchers listed. + * + * No values specified means we check ALL events + */ + fun isDispatch(): Boolean { + return isCurrentEvent(*values()) + } + + /** + * Checks if the current execution thread is running inside one of the event dispatchers listed. + * + * No values specified means we check ALL events + */ + fun isCurrentEvent(vararg events: EventDispatcher = values()): Boolean { + val threadId = Thread.currentThread().id + + events.forEach { event -> + if (threadIds[event.ordinal].value == threadId) { + return true + } + } + + return false + } + + /** + * Checks if the current execution thread is NOT running inside one of the event dispatchers listed. + * + * No values specified means we check ALL events + */ + fun isNotCurrentEvent(vararg events: EventDispatcher = values()): Boolean { + val currentDispatch = getCurrentEvent() ?: return false + + return events.contains(currentDispatch) + } + + /** + * @return which event dispatch thread we are running in, if any + */ + fun getCurrentEvent(): EventDispatcher? { + val threadId = Thread.currentThread().id + + values().forEach { event -> + if (threadIds[event.ordinal].value == threadId) { + return event + } + } + + return null + } /** * Each event type runs inside its own coroutine dispatcher. * * We want EACH event type to run in its own dispatcher... on its OWN thread, in order to prevent deadlocks * This is because there are blocking dependencies: DISCONNECT -> CONNECT. + * + * If an event is RE-ENTRANT, then it will immediately execute! */ - fun launch(event: EVENT, function: suspend CoroutineScope.() -> Unit): Job { + private fun launch(event: EventDispatcher, function: suspend () -> Unit): Job { + val eventId = event.ordinal + return if (DEBUG_EVENTS) { val id = traceId.getAndIncrement() - eventData[event.ordinal].launch(block = { + eventData[eventId].launch(block = { logger.debug { "Starting $event : $id" } function() logger.debug { "Finished $event : $id" } }) } else { - eventData[event.ordinal].launch(block = function) + eventData[eventId].launch { + function() + } + } + } + + suspend fun launchSequentially(endEvent: EventDispatcher, function: suspend () -> Unit) { + // If one of our callbacks requested a shutdown, we wait until all callbacks have run ... THEN shutdown + val event = getCurrentEvent() + + val index = event?.ordinal ?: -1 + + // This will loop through until it runs on the CLOSE EventDispatcher + if (index < endEvent.ordinal) { + // If this runs inside EVENT.CONNECT/DISCONNECT/ETC, we must ***WAIT*** until all listeners have been called! + // this problem is solved by running AGAIN after we have finished running whatever event dispatcher we are currently on + // MORE SPECIFICALLY, we must run at the end of our current one, but repeatedly until CLOSE + + EventDispatcher.launch(values()[index+1]) { + launchSequentially(endEvent, function) + } + } else { + function() } } } + + fun launch(function: suspend () -> Unit): Job { + return launch(this, function) + } }