From cbfe51f746d72e2829576ad26ac657934d58b54c Mon Sep 17 00:00:00 2001 From: Robinson Date: Mon, 13 Nov 2023 14:10:00 +0100 Subject: [PATCH] Added Handshake dispatch (was required, and must be single threaded) --- .../network/connection/EventDispatcher.kt | 18 +++--------------- .../handshake/ServerHandshakePollers.kt | 5 ++--- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/dorkbox/network/connection/EventDispatcher.kt b/src/dorkbox/network/connection/EventDispatcher.kt index 2702b041..0501cf2d 100644 --- a/src/dorkbox/network/connection/EventDispatcher.kt +++ b/src/dorkbox/network/connection/EventDispatcher.kt @@ -31,7 +31,7 @@ import java.util.concurrent.* internal class EventDispatcher(val type: String) { enum class EDType { // CLOSE must be last! - CONNECT, ERROR, CLOSE + HANDSHAKE, CONNECT, ERROR, CLOSE } internal class ED(private val dispatcher: EventDispatcher, private val type: EDType) { @@ -44,26 +44,12 @@ internal class EventDispatcher(val type: String) { } } - internal class Dispatch() { - companion object { - val executor = Executors.newCachedThreadPool( - NamedThreadFactory("Multi", Configuration.networkThreadGroup) - ) - } - - fun launch(function: () -> Unit) { - executor.submit(function) - } - } - companion object { private val DEBUG_EVENTS = false private val traceId = atomic(0) private val typedEntries: Array - val MULTI = Dispatch() - init { typedEntries = EDType.entries.toTypedArray() } @@ -91,6 +77,7 @@ internal class EventDispatcher(val type: String) { + val HANDSHAKE: ED val CONNECT: ED val ERROR: ED val CLOSE: ED @@ -103,6 +90,7 @@ internal class EventDispatcher(val type: String) { } } + HANDSHAKE = ED(this, EDType.HANDSHAKE) CONNECT = ED(this, EDType.CONNECT) ERROR = ED(this, EDType.ERROR) CLOSE = ED(this, EDType.CLOSE) diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index 247dda5d..fb78c1c0 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -26,7 +26,6 @@ import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake import dorkbox.network.aeron.AeronPoller import dorkbox.network.connection.Connection -import dorkbox.network.connection.EventDispatcher import dorkbox.network.connection.IpInfo import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerHandshakeException @@ -122,7 +121,7 @@ internal object ServerHandshakePollers { // NOTE: This MUST to happen in separates thread so that we can take as long as we need when creating publications and handshaking, // because under load -- this will REGULARLY timeout! Under no circumstance can this happen in the main processing thread!! - EventDispatcher.MULTI.launch { + server.eventDispatch.HANDSHAKE.launch { // we have read all the data, now dispatch it. // HandshakeMessage.HELLO // HandshakeMessage.DONE @@ -377,7 +376,7 @@ internal object ServerHandshakePollers { // NOTE: This MUST to happen in separates thread so that we can take as long as we need when creating publications and handshaking, // because under load -- this will REGULARLY timeout! Under no circumstance can this happen in the main processing thread!! - EventDispatcher.MULTI.launch { + server.eventDispatch.HANDSHAKE.launch { // HandshakeMessage.HELLO // HandshakeMessage.DONE val messageState = message.state