Fixed issues getting the subscription bind address.

This commit is contained in:
Robinson 2022-08-03 01:52:30 +02:00
parent 77957a8943
commit 6d519227df
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 41 additions and 33 deletions

View File

@ -44,6 +44,8 @@ internal open class ClientIpcDriver(streamId: Int,
var success: Boolean = false
override val type = "ipc"
override val subscriptionPort: Int = localSessionId
/**
* Set up the subscription + publication channels to the server
*

View File

@ -16,6 +16,8 @@
package dorkbox.network.aeron.mediaDriver
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection.Companion.uriEndpoint
import dorkbox.network.connection.ListenerManager
@ -24,6 +26,7 @@ import dorkbox.network.exceptions.ClientTimedOutException
import mu.KLogger
import java.lang.Thread.sleep
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.util.concurrent.*
@ -49,6 +52,16 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri
}
}
override val subscriptionPort: Int by lazy {
val addressesAndPorts = subscription.localSocketAddresses()
val first = addressesAndPorts.first()
// split
val splitPoint = first.lastIndexOf(':')
val port = first.substring(splitPoint+1)
port.toInt()
}
/**
* @throws ClientRetryException if we need to retry to connect
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
@ -66,23 +79,32 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uriEndpoint("udp", remoteSessionId, isReliable, address, addressString, port)
// Create a subscription the given address and port, using the given stream ID.
val subscriptionUri = uriEndpoint("udp", localSessionId, isReliable, address, addressString, 0)
if (logger.isTraceEnabled) {
logger.trace("client sub URI: $type ${subscriptionUri.build()}")
logger.trace("client pub URI: $type ${publicationUri.build()}")
}
logger.trace("client pub URI: $type ${publicationUri.build()}")
// For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions.
// ESPECIALLY if it is with the same streamID. This was noticed as a problem with IPC
val publication = aeronDriver.addPublication(publicationUri, streamId)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamId)
val localAddresses = publication.localSocketAddresses().first()
// split
val splitPoint = localAddresses.lastIndexOf(':')
val localAddressString = localAddresses.substring(0, splitPoint)
// the subscription here is WILDCARD
val localAddress = if (address is Inet6Address) {
IPv6.toAddress(localAddressString)!!
} else {
IPv4.toAddress(localAddressString)!!
}
// Create a subscription the given address and port, using the given stream ID.
val subscriptionUri = uriEndpoint("udp", localSessionId, isReliable, localAddress, localAddressString, 0)
logger.trace("client sub URI: $type ${subscriptionUri.build()}")
val subscription = aeronDriver.addSubscription(subscriptionUri, streamId)
// always include the linger timeout, so we don't accidentally kill ourself by taking too long
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs()
val startTime = System.nanoTime()

View File

@ -17,7 +17,6 @@
package dorkbox.network.aeron.mediaDriver
import dorkbox.util.logger
import io.aeron.Publication
import io.aeron.Subscription
@ -31,21 +30,5 @@ abstract class MediaDriverClient(val port: Int,
lateinit var subscription: Subscription
lateinit var publication: Publication
val subscriptionPort: Int by lazy {
if (this is ClientIpcDriver) {
localSessionId
} else {
val addressesAndPorts = subscription.localSocketAddresses()
if (addressesAndPorts.size > 1) {
logger().error { "Subscription ports for client is MORE than 1. This is 'ok', but we only support the use the first one!" }
}
val first = addressesAndPorts.first()
// split
val splitPoint = first.lastIndexOf(':')
val port = first.substring(splitPoint+1)
port.toInt()
}
}
abstract val subscriptionPort: Int
}

View File

@ -110,8 +110,9 @@ object AeronClient {
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = true
// configuration.enableIpc = false
// configuration.enableIpc = true
configuration.enableIpc = false
configuration.enableIPv4 = true
// configuration.enableIPv4 = false
// configuration.enableIPv6 = true
// configuration.uniqueAeronDirectory = true

View File

@ -92,9 +92,9 @@ object AeronServer {
configuration.port = 2000
configuration.maxClientCount = 50
configuration.enableIpc = true
// configuration.enableIpc = true
// configuration.enableIpc = false
configuration.enableIPv4 = false
// configuration.enableIPv4 = false
configuration.enableIPv6 = false
configuration.maxConnectionsPerIpAddress = 50