Cleaned up server handshakes

This commit is contained in:
Robinson 2023-05-26 15:41:21 +02:00
parent 784d0ecf02
commit 5d2e6ac551
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 38 additions and 50 deletions

View File

@ -23,8 +23,13 @@ import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
import dorkbox.network.aeron.mediaDriver.ServerIpcConnectionDriver
import dorkbox.network.aeron.mediaDriver.ServerUdpConnectionDriver
import dorkbox.network.aeron.mediaDriver.ServerUdpHandshakeDriver
import dorkbox.network.connection.*
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.EventDispatcher.Companion.EVENT
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.exceptions.AllocationException
import dorkbox.util.sync.CountDownLatch
import io.aeron.Publication
@ -33,7 +38,7 @@ import net.jodah.expiringmap.ExpirationPolicy
import net.jodah.expiringmap.ExpiringMap
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
/**
@ -375,7 +380,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
suspend fun processUdpHandshakeMessageServer(
server: Server<CONNECTION>,
mediaDriver: ServerUdpHandshakeDriver,
driver: AeronDriver,
handshakePublication: Publication,
clientAddress: InetAddress,
clientAddressString: String,
@ -521,8 +525,10 @@ internal class ServerHandshake<CONNECTION : Connection>(
listenAddress = mediaDriver.listenAddress,
remoteAddress = clientAddress,
remoteAddressString = clientAddressString,
portPub = portPub,
portSub = portSub,
logInfo = logType,
isReliable = isReliable,
logger = logger
@ -533,6 +539,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
"SERVER INFO:\n" +
"sessionId PUB: $connectionSessionIdPub\n" +
"sessionId SUB: $connectionSessionIdSub\n" +
"streamId PUB: $connectionStreamIdPub\n" +
"streamId SUB: $connectionStreamIdSub\n" +

View File

@ -18,7 +18,6 @@
package dorkbox.network.handshake
import dorkbox.collections.LockFreeIntMap
import dorkbox.netUtil.IP
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
@ -33,7 +32,6 @@ import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import io.aeron.FragmentAssembler
import io.aeron.Image
import io.aeron.Publication
import io.aeron.logbuffer.Header
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
@ -42,9 +40,6 @@ import org.agrona.DirectBuffer
import java.net.Inet4Address
internal object ServerHandshakePollers {
// session IDs are unique for a entire driver!
val sessionIdMap = LockFreeIntMap<Publication>()
fun disabled(serverInfo: String): AeronPoller {
return object : AeronPoller {
override fun poll(): Int { return 0 }
@ -75,20 +70,13 @@ internal object ServerHandshakePollers {
if (message !is HandshakeMessage) {
logger.error { "[$aeronLogInfo] Connection not allowed! Invalid connection request" }
} else {
var publication: Publication? = sessionIdMap[message.sessionId]
if (publication == null) {
// we create a NEW publication for the handshake, which connects directly to the client handshake subscription
val publicationUri = uri("ipc", message.sessionId, true)
publication = try {
val publication = try {
driver.addPublication(publicationUri, "HANDSHAKE-IPC", message.streamId)
} catch (e: Exception) {
logger.error(e) { "Cannot create IPC publication back to remote process" }
null
}
}
if (publication == null) {
return
}
@ -115,7 +103,6 @@ internal object ServerHandshakePollers {
logger = logger)) {
driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC")
sessionIdMap.remove(message.sessionId)
return
}
@ -128,6 +115,8 @@ internal object ServerHandshakePollers {
connectionFunc = connectionFunc,
logger = logger
)
driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC")
} else {
logger.error { "Cannot comm back to remote process" }
}
@ -175,14 +164,9 @@ internal object ServerHandshakePollers {
}
val isRemoteIpv4 = clientAddress is Inet4Address
val type: String
if (isRemoteIpv4) {
type = "IPv4"
} else {
if (!isRemoteIpv4) {
// this is necessary to clean up the address when adding it to aeron, since different formats mess it up
clientAddressString = IP.toString(clientAddress)
type = "IPv6"
}
@ -203,12 +187,10 @@ internal object ServerHandshakePollers {
// we create a NEW publication for the handshake, which connects directly to the client handshake subscription CONTROL (which then goes to the proper endpoint)
val publicationUri = uri("udp", message.sessionId, isReliable)
// .controlEndpoint(isRemoteIpv4, properPubAddress, port)
// .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
.endpoint(isRemoteIpv4, properPubAddress, message.port)
val publication = try {
driver.addPublication(publicationUri, type, message.streamId)
driver.addPublication(publicationUri, logInfo, message.streamId)
} catch (e: Exception) {
logger.error(e) { "Cannot create publication back to $clientAddressString" }
return
@ -234,13 +216,15 @@ internal object ServerHandshakePollers {
handshakePublication = publication,
message = message,
logger = logger)) {
// publications are REMOVED from Aeron clients when their linger timeout has expired!!!
driver.closeAndDeletePublication(publication, logInfo)
return
}
handshake.processUdpHandshakeMessageServer(
server = server,
mediaDriver = mediaDriver,
driver = driver,
handshakePublication = publication,
clientAddress = clientAddress,
clientAddressString = clientAddressString,
@ -270,7 +254,7 @@ internal object ServerHandshakePollers {
val driver = ServerIpcHandshakeDriver(
aeronDriver = server.aeronDriver,
streamIdSub = config.ipcId,
sessionIdSub = AeronDriver.HANDSHAKE_SESSION_ID,
sessionIdSub = AeronDriver.RESERVED_SESSION_ID_INVALID,
logger = logger
)
@ -312,15 +296,14 @@ internal object ServerHandshakePollers {
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
val pubPort = config.port + 1
val pubPort = config.port
val poller = if (server.canUseIPv4) {
val driver = ServerUdpHandshakeDriver(
aeronDriver = server.aeronDriver,
listenAddress = server.listenIPv6Address!!,
listenAddress = server.listenIPv4Address!!,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.HANDSHAKE_SESSION_ID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv4",
@ -369,7 +352,7 @@ internal object ServerHandshakePollers {
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
val pubPort = config.port + 1
val pubPort = config.port
val poller = if (server.canUseIPv6) {
val driver = ServerUdpHandshakeDriver(
@ -377,7 +360,6 @@ internal object ServerHandshakePollers {
listenAddress = server.listenIPv6Address!!,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.HANDSHAKE_SESSION_ID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv6",
@ -427,7 +409,7 @@ internal object ServerHandshakePollers {
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
val pubPort = config.port + 1
val pubPort = config.port
val poller = try {
val driver = ServerUdpHandshakeDriver(
@ -435,7 +417,6 @@ internal object ServerHandshakePollers {
listenAddress = server.listenIPv6Address!!,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.HANDSHAKE_SESSION_ID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv4+6",