Code polish

This commit is contained in:
Robinson 2023-06-25 12:01:56 +02:00
parent e02ef7d7e8
commit c9e71c56b1
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 126 additions and 33 deletions

View File

@ -21,7 +21,6 @@ import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.SerializationException import dorkbox.network.exceptions.SerializationException
import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerException
import dorkbox.network.exceptions.TransmitException import dorkbox.network.exceptions.TransmitException
import dorkbox.network.handshake.ConnectionCounts
import dorkbox.network.ping.Ping import dorkbox.network.ping.Ping
import dorkbox.network.rmi.RmiSupportConnection import dorkbox.network.rmi.RmiSupportConnection
import dorkbox.network.rmi.messages.MethodResponse import dorkbox.network.rmi.messages.MethodResponse
@ -427,10 +426,19 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val connection = this val connection = this
// clean up the resources associated with this connection when it's closed
endPoint.isServer { endPoint.isServer {
// clean up the resources associated with this connection when it's closed
logger.debug { "[${connection}] freeing resources" } logger.debug { "[${connection}] freeing resources" }
connection.cleanup(handshake.connectionsPerIpCounts) sessionIdAllocator.free(info.sessionIdPub)
sessionIdAllocator.free(info.sessionIdSub)
streamIdAllocator.free(info.streamIdPub)
streamIdAllocator.free(info.streamIdSub)
if (remoteAddress != null) {
// unique for UDP endpoints
handshake.connectionsPerIpCounts.decrementSlow(remoteAddress)
}
} }
logger.debug {"[$toString0] connection closed"} logger.debug {"[$toString0] connection closed"}
@ -466,18 +474,4 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val other1 = other as Connection val other1 = other as Connection
return id == other1.id return id == other1.id
} }
// cleans up the connection information (only the server calls this!)
internal fun cleanup(connectionsPerIpCounts: ConnectionCounts) {
sessionIdAllocator.free(info.sessionIdPub)
sessionIdAllocator.free(info.sessionIdSub)
streamIdAllocator.free(info.streamIdPub)
streamIdAllocator.free(info.streamIdSub)
if (remoteAddress != null) {
// unique for UDP endpoints
connectionsPerIpCounts.decrementSlow(remoteAddress)
}
}
} }

View File

@ -23,55 +23,154 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import java.util.concurrent.* import java.util.concurrent.*
/** /**
* This MUST be run on multiple coroutines! There are deadlock issues if it is only one. * This MUST be run on multiple coroutines! There are deadlock issues if it is only one.
*
* This class LITERALLY forces a coroutine dispatcher to be exclusively on a single thread.
*
* WARNING: The logic in this class will ONLY work in this class, as it relies on this specific behavior. Do not use it elsewhere!
*/ */
class EventDispatcher { enum class EventDispatcher {
// NOTE: CLOSE must be last!
HANDSHAKE, CONNECT, DISCONNECT, RESPONSE_MANAGER, ERROR, CLOSE;
companion object { companion object {
private val DEBUG_EVENTS = false private val DEBUG_EVENTS = false
private val traceId = atomic(0) private val traceId = atomic(0)
private val logger = KotlinLogging.logger(EventDispatcher::class.java.simpleName) private val logger = KotlinLogging.logger(EventDispatcher::class.java.simpleName)
enum class EVENT { private val threadIds = values().map { atomic(0L) }.toTypedArray()
HANDSHAKE, INIT, CONNECT, DISCONNECT, CLOSE, RESPONSE_MANAGER
}
private val eventData = Array(EVENT.values().size) {
private val executors = values().map { event ->
// It CANNOT be the default dispatch because there will be thread starvation // It CANNOT be the default dispatch because there will be thread starvation
// NOTE: THIS CANNOT CHANGE!! IT WILL BREAK EVERYTHING IF IT CHANGES! // NOTE: THIS CANNOT CHANGE!! IT WILL BREAK EVERYTHING IF IT CHANGES!
val executor = Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
NamedThreadFactory("Event Dispatcher-${EVENT.values()[it].name}", Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true) NamedThreadFactory("Event Dispatcher-${event.name}",
).asCoroutineDispatcher() Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true) { thread ->
// when a new thread is created, assign it to the array
CoroutineScope(executor + SupervisorJob()) threadIds[event.ordinal].lazySet(thread.id)
}
)
} }
val isActive: Boolean private val eventData = executors.map { executor ->
get() = eventData.all { it.isActive } CoroutineScope(executor.asCoroutineDispatcher() + SupervisorJob())
}
init {
executors.forEachIndexed { _, executor ->
executor.submit {
// this is to create a new thread only, so that the thread ID can be assigned
}
}
}
/**
* Checks if the current execution thread is running inside one of the event dispatchers listed.
*
* No values specified means we check ALL events
*/
fun isDispatch(): Boolean {
return isCurrentEvent(*values())
}
/**
* Checks if the current execution thread is running inside one of the event dispatchers listed.
*
* No values specified means we check ALL events
*/
fun isCurrentEvent(vararg events: EventDispatcher = values()): Boolean {
val threadId = Thread.currentThread().id
events.forEach { event ->
if (threadIds[event.ordinal].value == threadId) {
return true
}
}
return false
}
/**
* Checks if the current execution thread is NOT running inside one of the event dispatchers listed.
*
* No values specified means we check ALL events
*/
fun isNotCurrentEvent(vararg events: EventDispatcher = values()): Boolean {
val currentDispatch = getCurrentEvent() ?: return false
return events.contains(currentDispatch)
}
/**
* @return which event dispatch thread we are running in, if any
*/
fun getCurrentEvent(): EventDispatcher? {
val threadId = Thread.currentThread().id
values().forEach { event ->
if (threadIds[event.ordinal].value == threadId) {
return event
}
}
return null
}
/** /**
* Each event type runs inside its own coroutine dispatcher. * Each event type runs inside its own coroutine dispatcher.
* *
* We want EACH event type to run in its own dispatcher... on its OWN thread, in order to prevent deadlocks * We want EACH event type to run in its own dispatcher... on its OWN thread, in order to prevent deadlocks
* This is because there are blocking dependencies: DISCONNECT -> CONNECT. * This is because there are blocking dependencies: DISCONNECT -> CONNECT.
*
* If an event is RE-ENTRANT, then it will immediately execute!
*/ */
fun launch(event: EVENT, function: suspend CoroutineScope.() -> Unit): Job { private fun launch(event: EventDispatcher, function: suspend () -> Unit): Job {
val eventId = event.ordinal
return if (DEBUG_EVENTS) { return if (DEBUG_EVENTS) {
val id = traceId.getAndIncrement() val id = traceId.getAndIncrement()
eventData[event.ordinal].launch(block = { eventData[eventId].launch(block = {
logger.debug { "Starting $event : $id" } logger.debug { "Starting $event : $id" }
function() function()
logger.debug { "Finished $event : $id" } logger.debug { "Finished $event : $id" }
}) })
} else { } else {
eventData[event.ordinal].launch(block = function) eventData[eventId].launch {
function()
}
}
}
suspend fun launchSequentially(endEvent: EventDispatcher, function: suspend () -> Unit) {
// If one of our callbacks requested a shutdown, we wait until all callbacks have run ... THEN shutdown
val event = getCurrentEvent()
val index = event?.ordinal ?: -1
// This will loop through until it runs on the CLOSE EventDispatcher
if (index < endEvent.ordinal) {
// If this runs inside EVENT.CONNECT/DISCONNECT/ETC, we must ***WAIT*** until all listeners have been called!
// this problem is solved by running AGAIN after we have finished running whatever event dispatcher we are currently on
// MORE SPECIFICALLY, we must run at the end of our current one, but repeatedly until CLOSE
EventDispatcher.launch(values()[index+1]) {
launchSequentially(endEvent, function)
}
} else {
function()
} }
} }
} }
fun launch(function: suspend () -> Unit): Job {
return launch(this, function)
}
} }