From 5d2e6ac5516e4adb4c0b709cad4921d752dead73 Mon Sep 17 00:00:00 2001 From: Robinson Date: Fri, 26 May 2023 15:41:21 +0200 Subject: [PATCH] Cleaned up server handshakes --- .../network/handshake/ServerHandshake.kt | 13 +++- .../handshake/ServerHandshakePollers.kt | 75 +++++++------------ 2 files changed, 38 insertions(+), 50 deletions(-) diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 23c3a96d..fdd7b71f 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -23,8 +23,13 @@ import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator import dorkbox.network.aeron.mediaDriver.ServerIpcConnectionDriver import dorkbox.network.aeron.mediaDriver.ServerUdpConnectionDriver import dorkbox.network.aeron.mediaDriver.ServerUdpHandshakeDriver -import dorkbox.network.connection.* +import dorkbox.network.connection.Connection +import dorkbox.network.connection.ConnectionParams +import dorkbox.network.connection.EndPoint +import dorkbox.network.connection.EventDispatcher import dorkbox.network.connection.EventDispatcher.Companion.EVENT +import dorkbox.network.connection.ListenerManager +import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.exceptions.AllocationException import dorkbox.util.sync.CountDownLatch import io.aeron.Publication @@ -33,7 +38,7 @@ import net.jodah.expiringmap.ExpirationPolicy import net.jodah.expiringmap.ExpiringMap import java.net.Inet4Address import java.net.InetAddress -import java.util.concurrent.TimeUnit +import java.util.concurrent.* /** @@ -375,7 +380,6 @@ internal class ServerHandshake( suspend fun processUdpHandshakeMessageServer( server: Server, mediaDriver: ServerUdpHandshakeDriver, - driver: AeronDriver, handshakePublication: Publication, clientAddress: InetAddress, clientAddressString: String, @@ -521,8 +525,10 @@ internal class ServerHandshake( listenAddress = mediaDriver.listenAddress, remoteAddress = clientAddress, remoteAddressString = clientAddressString, + portPub = portPub, portSub = portSub, + logInfo = logType, isReliable = isReliable, logger = logger @@ -533,6 +539,7 @@ internal class ServerHandshake( "SERVER INFO:\n" + "sessionId PUB: $connectionSessionIdPub\n" + "sessionId SUB: $connectionSessionIdSub\n" + + "streamId PUB: $connectionStreamIdPub\n" + "streamId SUB: $connectionStreamIdSub\n" + diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index bfceff54..d4adbb91 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -18,7 +18,6 @@ package dorkbox.network.handshake -import dorkbox.collections.LockFreeIntMap import dorkbox.netUtil.IP import dorkbox.network.Server import dorkbox.network.ServerConfiguration @@ -33,7 +32,6 @@ import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.EndPoint import io.aeron.FragmentAssembler import io.aeron.Image -import io.aeron.Publication import io.aeron.logbuffer.Header import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking @@ -42,9 +40,6 @@ import org.agrona.DirectBuffer import java.net.Inet4Address internal object ServerHandshakePollers { - // session IDs are unique for a entire driver! - val sessionIdMap = LockFreeIntMap() - fun disabled(serverInfo: String): AeronPoller { return object : AeronPoller { override fun poll(): Int { return 0 } @@ -75,20 +70,13 @@ internal object ServerHandshakePollers { if (message !is HandshakeMessage) { logger.error { "[$aeronLogInfo] Connection not allowed! Invalid connection request" } } else { - var publication: Publication? = sessionIdMap[message.sessionId] - if (publication == null) { - // we create a NEW publication for the handshake, which connects directly to the client handshake subscription - val publicationUri = uri("ipc", message.sessionId, true) + // we create a NEW publication for the handshake, which connects directly to the client handshake subscription + val publicationUri = uri("ipc", message.sessionId, true) - publication = try { - driver.addPublication(publicationUri, "HANDSHAKE-IPC", message.streamId) - } catch (e: Exception) { - logger.error(e) { "Cannot create IPC publication back to remote process" } - null - } - } - - if (publication == null) { + val publication = try { + driver.addPublication(publicationUri, "HANDSHAKE-IPC", message.streamId) + } catch (e: Exception) { + logger.error(e) { "Cannot create IPC publication back to remote process" } return } @@ -115,7 +103,6 @@ internal object ServerHandshakePollers { logger = logger)) { driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC") - sessionIdMap.remove(message.sessionId) return } @@ -128,6 +115,8 @@ internal object ServerHandshakePollers { connectionFunc = connectionFunc, logger = logger ) + + driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC") } else { logger.error { "Cannot comm back to remote process" } } @@ -175,14 +164,9 @@ internal object ServerHandshakePollers { } val isRemoteIpv4 = clientAddress is Inet4Address - val type: String - - if (isRemoteIpv4) { - type = "IPv4" - } else { + if (!isRemoteIpv4) { // this is necessary to clean up the address when adding it to aeron, since different formats mess it up clientAddressString = IP.toString(clientAddress) - type = "IPv6" } @@ -203,12 +187,10 @@ internal object ServerHandshakePollers { // we create a NEW publication for the handshake, which connects directly to the client handshake subscription CONTROL (which then goes to the proper endpoint) val publicationUri = uri("udp", message.sessionId, isReliable) -// .controlEndpoint(isRemoteIpv4, properPubAddress, port) -// .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) .endpoint(isRemoteIpv4, properPubAddress, message.port) val publication = try { - driver.addPublication(publicationUri, type, message.streamId) + driver.addPublication(publicationUri, logInfo, message.streamId) } catch (e: Exception) { logger.error(e) { "Cannot create publication back to $clientAddressString" } return @@ -234,21 +216,23 @@ internal object ServerHandshakePollers { handshakePublication = publication, message = message, logger = logger)) { + // publications are REMOVED from Aeron clients when their linger timeout has expired!!! + driver.closeAndDeletePublication(publication, logInfo) + return } handshake.processUdpHandshakeMessageServer( - server = server, - mediaDriver = mediaDriver, - driver = driver, - handshakePublication = publication, - clientAddress = clientAddress, - clientAddressString = clientAddressString, - isReliable = isReliable, - message = message, - aeronLogInfo = aeronLogInfo, - connectionFunc = connectionFunc, - logger = logger + server = server, + mediaDriver = mediaDriver, + handshakePublication = publication, + clientAddress = clientAddress, + clientAddressString = clientAddressString, + isReliable = isReliable, + message = message, + aeronLogInfo = aeronLogInfo, + connectionFunc = connectionFunc, + logger = logger ) } else { logger.error { "Cannot create $logInfo publication back to '$clientAddressString'" } @@ -270,7 +254,7 @@ internal object ServerHandshakePollers { val driver = ServerIpcHandshakeDriver( aeronDriver = server.aeronDriver, streamIdSub = config.ipcId, - sessionIdSub = AeronDriver.HANDSHAKE_SESSION_ID, + sessionIdSub = AeronDriver.RESERVED_SESSION_ID_INVALID, logger = logger ) @@ -312,15 +296,14 @@ internal object ServerHandshakePollers { val connectionFunc = server.connectionFunc val config = server.config val isReliable = config.isReliable - val pubPort = config.port + 1 + val pubPort = config.port val poller = if (server.canUseIPv4) { val driver = ServerUdpHandshakeDriver( aeronDriver = server.aeronDriver, - listenAddress = server.listenIPv6Address!!, + listenAddress = server.listenIPv4Address!!, port = config.port, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronDriver.HANDSHAKE_SESSION_ID, connectionTimeoutSec = config.connectionCloseTimeoutInSeconds, isReliable = isReliable, logInfo = "HANDSHAKE-IPv4", @@ -369,7 +352,7 @@ internal object ServerHandshakePollers { val connectionFunc = server.connectionFunc val config = server.config val isReliable = config.isReliable - val pubPort = config.port + 1 + val pubPort = config.port val poller = if (server.canUseIPv6) { val driver = ServerUdpHandshakeDriver( @@ -377,7 +360,6 @@ internal object ServerHandshakePollers { listenAddress = server.listenIPv6Address!!, port = config.port, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronDriver.HANDSHAKE_SESSION_ID, connectionTimeoutSec = config.connectionCloseTimeoutInSeconds, isReliable = isReliable, logInfo = "HANDSHAKE-IPv6", @@ -427,7 +409,7 @@ internal object ServerHandshakePollers { val connectionFunc = server.connectionFunc val config = server.config val isReliable = config.isReliable - val pubPort = config.port + 1 + val pubPort = config.port val poller = try { val driver = ServerUdpHandshakeDriver( @@ -435,7 +417,6 @@ internal object ServerHandshakePollers { listenAddress = server.listenIPv6Address!!, port = config.port, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronDriver.HANDSHAKE_SESSION_ID, connectionTimeoutSec = config.connectionCloseTimeoutInSeconds, isReliable = isReliable, logInfo = "HANDSHAKE-IPv4+6",