From 2a1c303c6a622a33865503ee53f2a940851c9cea Mon Sep 17 00:00:00 2001 From: Robinson Date: Thu, 29 Jun 2023 23:50:22 +0200 Subject: [PATCH] Moved localAddressString logic the Driver --- src/dorkbox/network/aeron/AeronDriver.kt | 31 ++++++++++++++----- .../handshake/ClientConnectionDriver.kt | 2 +- .../handshake/ClientHandshakeDriver.kt | 12 +++---- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index ed51b893..2b983311 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -45,8 +45,6 @@ import org.agrona.concurrent.ringbuffer.RingBufferDescriptor import org.agrona.concurrent.status.CountersReader import org.slf4j.Logger import java.io.File -import java.net.Inet6Address -import java.net.InetAddress import java.util.concurrent.* fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, port: Int): ChannelUriStringBuilder { @@ -343,21 +341,40 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger /** * This will return the local-address of the interface that connects with the remote address (instead of on ALL interfaces) */ - fun getLocalAddressString(publication: Publication, remoteAddress: InetAddress): String { - val localAddresses = publication.localSocketAddresses().first() + fun getLocalAddressString(publication: Publication, isRemoteIpv4: Boolean): String { + val localSocketAddress = publication.localSocketAddresses() + if (localSocketAddress == null || localSocketAddress.isEmpty()) { + throw Exception("The local socket address for the publication ${publication.channel()} is null/empty.") + } + + val localAddresses = localSocketAddress.first() val splitPoint = localAddresses.lastIndexOf(':') var localAddressString = localAddresses.substring(0, splitPoint) - return if (remoteAddress is Inet6Address) { + return if (isRemoteIpv4) { + localAddressString + } else { // this is necessary to clean up the address when adding it to aeron, since different formats mess it up // aeron IPv6 addresses all have [...] localAddressString = localAddressString.substring(1, localAddressString.length-1) IPv6.toString(IPv6.toAddress(localAddressString)!!) - } else { - localAddressString } } + /** + * This will return the local-address of the interface that connects with the remote address (instead of on ALL interfaces) + */ + fun getLocalAddressString(subscription: Subscription): String { + val localSocketAddress = subscription.localSocketAddresses() + if (localSocketAddress == null || localSocketAddress.isEmpty()) { + throw Exception("The local socket address for the subscription ${subscription.channel()} is null/empty.") + } + + val addressesAndPorts = localSocketAddress.first() + val splitPoint2 = addressesAndPorts.lastIndexOf(':') + return addressesAndPorts.substring(0, splitPoint2) + } + internal fun getDriverConfig(config: Configuration, logger: KLogger): Configuration.MediaDriverConfig { diff --git a/src/dorkbox/network/handshake/ClientConnectionDriver.kt b/src/dorkbox/network/handshake/ClientConnectionDriver.kt index 5952f083..fd397301 100644 --- a/src/dorkbox/network/handshake/ClientConnectionDriver.kt +++ b/src/dorkbox/network/handshake/ClientConnectionDriver.kt @@ -185,7 +185,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) { } // this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces. - val localAddressString = getLocalAddressString(publication, remoteAddress) + val localAddressString = getLocalAddressString(publication, isRemoteIpv4) // A control endpoint for the subscriptions will cause a periodic service management "heartbeat" to be sent to the diff --git a/src/dorkbox/network/handshake/ClientHandshakeDriver.kt b/src/dorkbox/network/handshake/ClientHandshakeDriver.kt index 5e0a2c7c..a4816dec 100644 --- a/src/dorkbox/network/handshake/ClientHandshakeDriver.kt +++ b/src/dorkbox/network/handshake/ClientHandshakeDriver.kt @@ -141,8 +141,6 @@ internal class ClientHandshakeDriver( "HANDSHAKE-IPv6" } - - streamIdPub = AeronDriver.UDP_HANDSHAKE_STREAM_ID @@ -162,9 +160,11 @@ internal class ClientHandshakeDriver( // we have to figure out what our sub port info is, otherwise the server cannot connect back! - val addressesAndPorts = pubSub.sub.localSocketAddresses().first() - val splitPoint2 = addressesAndPorts.lastIndexOf(':') - val subscriptionAddress = addressesAndPorts.substring(0, splitPoint2) + val subscriptionAddress = try { + getLocalAddressString(pubSub.sub) + } catch (e: Exception) { + throw ClientRetryException("$logInfo subscription is not properly created!", e) + } details = if (subscriptionAddress == remoteAddressString) { logInfo @@ -262,7 +262,7 @@ internal class ClientHandshakeDriver( // this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces. - val localAddressString = getLocalAddressString(publication, remoteAddress) + val localAddressString = getLocalAddressString(publication, isRemoteIpv4) // Create a subscription the given address and port, using the given stream ID.