diff --git a/src/dorkbox/network/aeron/mediaDriver/ClientConnectionDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ClientConnectionDriver.kt index bb143d9e..0af3b646 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ClientConnectionDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ClientConnectionDriver.kt @@ -23,6 +23,7 @@ import dorkbox.network.aeron.endpoint import dorkbox.network.exceptions.ClientRetryException import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.handshake.ClientConnectionInfo +import io.aeron.CommonContext import java.net.Inet4Address import java.net.InetAddress @@ -120,7 +121,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) { // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // Create a publication at the given address and port, using the given stream ID. - val publicationUri = uri("ipc", sessionIdPub, reliable) + val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable) // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // publication of any state to other threads and not be long running or re-entrant with the client. @@ -130,7 +131,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) { } // Create a subscription at the given address and port, using the given stream ID. - val subscriptionUri = uri("ipc", sessionIdSub, reliable) + val subscriptionUri = uri(CommonContext.IPC_MEDIA, sessionIdSub, reliable) val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) return PubSub(publication, subscription, @@ -160,7 +161,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) { // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // Create a publication at the given address and port, using the given stream ID. - val publicationUri = uri("udp", sessionIdPub, reliable) + val publicationUri = uri(CommonContext.UDP_MEDIA, sessionIdPub, reliable) .endpoint(isRemoteIpv4, remoteAddressString, portPub) @@ -174,7 +175,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) { // this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces. val localAddressString = getLocalAddressString(publication, remoteAddress) - val subscriptionUri = uri("udp", sessionIdSub, reliable) + val subscriptionUri = uri(CommonContext.UDP_MEDIA, sessionIdSub, reliable) .endpoint(isRemoteIpv4, localAddressString, portSub) val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) diff --git a/src/dorkbox/network/aeron/mediaDriver/ClientHandshakeDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ClientHandshakeDriver.kt index e758c775..07c29e7d 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ClientHandshakeDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ClientHandshakeDriver.kt @@ -23,12 +23,14 @@ import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.uri import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake import dorkbox.network.aeron.endpoint +import dorkbox.network.connection.CryptoManagement import dorkbox.network.connection.EndPoint import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientRetryException import dorkbox.network.exceptions.ClientTimedOutException +import io.aeron.CommonContext import io.aeron.Subscription import mu.KLogger import java.net.Inet4Address @@ -75,10 +77,8 @@ internal class ClientHandshakeDriver( var details = "" - // for UDP, this must be unique otherwise we CANNOT connect to the server! - // additionally, this must ONLY be unique per driver (not per connection!) If it is unique PER connection, - // there will be sessionID errors (because the handshake connections are created too quickly during reconnects - val sessionIdPub = aeronDriver.clientUdpHandshakeSessionId + // this must be unique otherwise we CANNOT connect to the server! + val sessionIdPub = CryptoManagement.secureRandom.nextInt() // with IPC, the aeron driver MUST be shared, so having a UNIQUE sessionIdPub/Sub is unnecessary. // sessionIdPub = sessionIdAllocator.allocate() @@ -186,7 +186,7 @@ internal class ClientHandshakeDriver( ): PubSub { // Create a publication at the given address and port, using the given stream ID. // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri("ipc", sessionIdPub, reliable) + val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable) // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // publication of any state to other threads and not be long running or re-entrant with the client. @@ -201,7 +201,7 @@ internal class ClientHandshakeDriver( } // Create a subscription at the given address and port, using the given stream ID. - val subscriptionUri = uriHandshake("ipc", reliable) + val subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, reliable) val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) return PubSub(publication, subscription, @@ -231,7 +231,7 @@ internal class ClientHandshakeDriver( // Create a publication at the given address and port, using the given stream ID. // ANY sessionID for the publication will work, because the SERVER doesn't have it defined - val publicationUri = uri("udp", sessionIdPub, reliable) + val publicationUri = uri(CommonContext.UDP_MEDIA, sessionIdPub, reliable) .endpoint(isRemoteIpv4, remoteAddressString, portPub) @@ -266,7 +266,7 @@ internal class ClientHandshakeDriver( } try { - val subscriptionUri = uriHandshake("udp", reliable) + val subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, reliable) .endpoint(isRemoteIpv4, localAddressString, actualPortSub) subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerConnectionDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerConnectionDriver.kt index b039dc3e..6115e851 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerConnectionDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerConnectionDriver.kt @@ -19,6 +19,7 @@ package dorkbox.network.aeron.mediaDriver import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver.Companion.uri import dorkbox.network.connection.IpInfo +import io.aeron.CommonContext import java.net.Inet4Address import java.net.InetAddress @@ -86,14 +87,14 @@ internal class ServerConnectionDriver(val pubSub: PubSub) { // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // create a new publication for the connection (since the handshake ALWAYS closes the current publication) - val publicationUri = uri("ipc", sessionIdPub, reliable) + val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable) // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // publication of any state to other threads and not be long running or re-entrant with the client. val publication = aeronDriver.addPublication(publicationUri, streamIdPub, logInfo) // Create a subscription at the given address and port, using the given stream ID. - val subscriptionUri = uri("ipc", sessionIdSub, reliable) + val subscriptionUri = uri(CommonContext.IPC_MEDIA, sessionIdSub, reliable) val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) return PubSub(publication, subscription, @@ -130,7 +131,7 @@ internal class ServerConnectionDriver(val pubSub: PubSub) { // if we are IPv6 WILDCARD -- then our subscription must ALSO be IPv6, even if our connection is via IPv4 // Create a subscription at the given address and port, using the given stream ID. - val subscriptionUri = uri("udp", sessionIdSub, reliable) + val subscriptionUri = uri(CommonContext.UDP_MEDIA, sessionIdSub, reliable) .endpoint(ipInfo.formattedListenAddressString + ":" + portSub) diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerHandshakeDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerHandshakeDriver.kt index bd477ed0..0b6285a0 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerHandshakeDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerHandshakeDriver.kt @@ -20,6 +20,7 @@ import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake import dorkbox.network.connection.IpInfo import io.aeron.ChannelUriStringBuilder +import io.aeron.CommonContext import io.aeron.Subscription /** @@ -41,13 +42,13 @@ internal class ServerHandshakeDriver(private val aeronDriver: AeronDriver, val s val subscriptionUri: ChannelUriStringBuilder if (isIpc) { - subscriptionUri = uriHandshake("ipc", isReliable) + subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable) info = "$logInfo [$sessionIdSub|$streamIdSub]" } else { val port = ipInfo.port // are we ipv4 or ipv6 or ipv6wildcard? - subscriptionUri = uriHandshake("udp", isReliable) + subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, isReliable) .endpoint(ipInfo.getAeronPubAddress(ipInfo.isIpv4) + ":" + port) info = "$logInfo ${ipInfo.listenAddressStringPretty} [$sessionIdSub|$streamIdSub|$port] (reliable:$isReliable)"