Moved localAddressString logic the Driver

This commit is contained in:
Robinson 2023-06-29 23:50:22 +02:00
parent 0cc8828ce8
commit 2a1c303c6a
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 31 additions and 14 deletions

View File

@ -45,8 +45,6 @@ import org.agrona.concurrent.ringbuffer.RingBufferDescriptor
import org.agrona.concurrent.status.CountersReader
import org.slf4j.Logger
import java.io.File
import java.net.Inet6Address
import java.net.InetAddress
import java.util.concurrent.*
fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, port: Int): ChannelUriStringBuilder {
@ -343,21 +341,40 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
/**
* This will return the local-address of the interface that connects with the remote address (instead of on ALL interfaces)
*/
fun getLocalAddressString(publication: Publication, remoteAddress: InetAddress): String {
val localAddresses = publication.localSocketAddresses().first()
fun getLocalAddressString(publication: Publication, isRemoteIpv4: Boolean): String {
val localSocketAddress = publication.localSocketAddresses()
if (localSocketAddress == null || localSocketAddress.isEmpty()) {
throw Exception("The local socket address for the publication ${publication.channel()} is null/empty.")
}
val localAddresses = localSocketAddress.first()
val splitPoint = localAddresses.lastIndexOf(':')
var localAddressString = localAddresses.substring(0, splitPoint)
return if (remoteAddress is Inet6Address) {
return if (isRemoteIpv4) {
localAddressString
} else {
// this is necessary to clean up the address when adding it to aeron, since different formats mess it up
// aeron IPv6 addresses all have [...]
localAddressString = localAddressString.substring(1, localAddressString.length-1)
IPv6.toString(IPv6.toAddress(localAddressString)!!)
} else {
localAddressString
}
}
/**
* This will return the local-address of the interface that connects with the remote address (instead of on ALL interfaces)
*/
fun getLocalAddressString(subscription: Subscription): String {
val localSocketAddress = subscription.localSocketAddresses()
if (localSocketAddress == null || localSocketAddress.isEmpty()) {
throw Exception("The local socket address for the subscription ${subscription.channel()} is null/empty.")
}
val addressesAndPorts = localSocketAddress.first()
val splitPoint2 = addressesAndPorts.lastIndexOf(':')
return addressesAndPorts.substring(0, splitPoint2)
}
internal fun getDriverConfig(config: Configuration, logger: KLogger): Configuration.MediaDriverConfig {

View File

@ -185,7 +185,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
}
// this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces.
val localAddressString = getLocalAddressString(publication, remoteAddress)
val localAddressString = getLocalAddressString(publication, isRemoteIpv4)
// A control endpoint for the subscriptions will cause a periodic service management "heartbeat" to be sent to the

View File

@ -141,8 +141,6 @@ internal class ClientHandshakeDriver(
"HANDSHAKE-IPv6"
}
streamIdPub = AeronDriver.UDP_HANDSHAKE_STREAM_ID
@ -162,9 +160,11 @@ internal class ClientHandshakeDriver(
// we have to figure out what our sub port info is, otherwise the server cannot connect back!
val addressesAndPorts = pubSub.sub.localSocketAddresses().first()
val splitPoint2 = addressesAndPorts.lastIndexOf(':')
val subscriptionAddress = addressesAndPorts.substring(0, splitPoint2)
val subscriptionAddress = try {
getLocalAddressString(pubSub.sub)
} catch (e: Exception) {
throw ClientRetryException("$logInfo subscription is not properly created!", e)
}
details = if (subscriptionAddress == remoteAddressString) {
logInfo
@ -262,7 +262,7 @@ internal class ClientHandshakeDriver(
// this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces.
val localAddressString = getLocalAddressString(publication, remoteAddress)
val localAddressString = getLocalAddressString(publication, isRemoteIpv4)
// Create a subscription the given address and port, using the given stream ID.