Code cleanup

This commit is contained in:
Robinson 2023-06-25 17:21:25 +02:00
parent 3a9e9fea71
commit 9a3e49bca4
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 20 additions and 17 deletions

View File

@ -23,6 +23,7 @@ import dorkbox.network.aeron.endpoint
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.handshake.ClientConnectionInfo
import io.aeron.CommonContext
import java.net.Inet4Address
import java.net.InetAddress
@ -120,7 +121,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// Create a publication at the given address and port, using the given stream ID.
val publicationUri = uri("ipc", sessionIdPub, reliable)
val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable)
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
@ -130,7 +131,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
}
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uri("ipc", sessionIdSub, reliable)
val subscriptionUri = uri(CommonContext.IPC_MEDIA, sessionIdSub, reliable)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)
return PubSub(publication, subscription,
@ -160,7 +161,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// Create a publication at the given address and port, using the given stream ID.
val publicationUri = uri("udp", sessionIdPub, reliable)
val publicationUri = uri(CommonContext.UDP_MEDIA, sessionIdPub, reliable)
.endpoint(isRemoteIpv4, remoteAddressString, portPub)
@ -174,7 +175,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
// this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces.
val localAddressString = getLocalAddressString(publication, remoteAddress)
val subscriptionUri = uri("udp", sessionIdSub, reliable)
val subscriptionUri = uri(CommonContext.UDP_MEDIA, sessionIdSub, reliable)
.endpoint(isRemoteIpv4, localAddressString, portSub)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)

View File

@ -23,12 +23,14 @@ import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
import dorkbox.network.aeron.AeronDriver.Companion.uri
import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake
import dorkbox.network.aeron.endpoint
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.CommonContext
import io.aeron.Subscription
import mu.KLogger
import java.net.Inet4Address
@ -75,10 +77,8 @@ internal class ClientHandshakeDriver(
var details = ""
// for UDP, this must be unique otherwise we CANNOT connect to the server!
// additionally, this must ONLY be unique per driver (not per connection!) If it is unique PER connection,
// there will be sessionID errors (because the handshake connections are created too quickly during reconnects
val sessionIdPub = aeronDriver.clientUdpHandshakeSessionId
// this must be unique otherwise we CANNOT connect to the server!
val sessionIdPub = CryptoManagement.secureRandom.nextInt()
// with IPC, the aeron driver MUST be shared, so having a UNIQUE sessionIdPub/Sub is unnecessary.
// sessionIdPub = sessionIdAllocator.allocate()
@ -186,7 +186,7 @@ internal class ClientHandshakeDriver(
): PubSub {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri("ipc", sessionIdPub, reliable)
val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable)
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
@ -201,7 +201,7 @@ internal class ClientHandshakeDriver(
}
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uriHandshake("ipc", reliable)
val subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, reliable)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)
return PubSub(publication, subscription,
@ -231,7 +231,7 @@ internal class ClientHandshakeDriver(
// Create a publication at the given address and port, using the given stream ID.
// ANY sessionID for the publication will work, because the SERVER doesn't have it defined
val publicationUri = uri("udp", sessionIdPub, reliable)
val publicationUri = uri(CommonContext.UDP_MEDIA, sessionIdPub, reliable)
.endpoint(isRemoteIpv4, remoteAddressString, portPub)
@ -266,7 +266,7 @@ internal class ClientHandshakeDriver(
}
try {
val subscriptionUri = uriHandshake("udp", reliable)
val subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, reliable)
.endpoint(isRemoteIpv4, localAddressString, actualPortSub)
subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)

View File

@ -19,6 +19,7 @@ package dorkbox.network.aeron.mediaDriver
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.uri
import dorkbox.network.connection.IpInfo
import io.aeron.CommonContext
import java.net.Inet4Address
import java.net.InetAddress
@ -86,14 +87,14 @@ internal class ServerConnectionDriver(val pubSub: PubSub) {
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// create a new publication for the connection (since the handshake ALWAYS closes the current publication)
val publicationUri = uri("ipc", sessionIdPub, reliable)
val publicationUri = uri(CommonContext.IPC_MEDIA, sessionIdPub, reliable)
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
val publication = aeronDriver.addPublication(publicationUri, streamIdPub, logInfo)
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uri("ipc", sessionIdSub, reliable)
val subscriptionUri = uri(CommonContext.IPC_MEDIA, sessionIdSub, reliable)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)
return PubSub(publication, subscription,
@ -130,7 +131,7 @@ internal class ServerConnectionDriver(val pubSub: PubSub) {
// if we are IPv6 WILDCARD -- then our subscription must ALSO be IPv6, even if our connection is via IPv4
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uri("udp", sessionIdSub, reliable)
val subscriptionUri = uri(CommonContext.UDP_MEDIA, sessionIdSub, reliable)
.endpoint(ipInfo.formattedListenAddressString + ":" + portSub)

View File

@ -20,6 +20,7 @@ import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake
import dorkbox.network.connection.IpInfo
import io.aeron.ChannelUriStringBuilder
import io.aeron.CommonContext
import io.aeron.Subscription
/**
@ -41,13 +42,13 @@ internal class ServerHandshakeDriver(private val aeronDriver: AeronDriver, val s
val subscriptionUri: ChannelUriStringBuilder
if (isIpc) {
subscriptionUri = uriHandshake("ipc", isReliable)
subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable)
info = "$logInfo [$sessionIdSub|$streamIdSub]"
} else {
val port = ipInfo.port
// are we ipv4 or ipv6 or ipv6wildcard?
subscriptionUri = uriHandshake("udp", isReliable)
subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, isReliable)
.endpoint(ipInfo.getAeronPubAddress(ipInfo.isIpv4) + ":" + port)
info = "$logInfo ${ipInfo.listenAddressStringPretty} [$sessionIdSub|$streamIdSub|$port] (reliable:$isReliable)"