Connection timeouts are now in seconds, instead of milliseconds

This commit is contained in:
Robinson 2022-03-08 08:41:06 +01:00
parent 2a14d09a2c
commit 06be3bee28
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
9 changed files with 73 additions and 59 deletions

View File

@ -16,11 +16,20 @@
package dorkbox.network
import dorkbox.bytes.toHexString
import dorkbox.netUtil.*
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.netUtil.Inet4
import dorkbox.netUtil.Inet6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.connection.*
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.connection.eventLoop
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRejectedException
@ -90,7 +99,6 @@ open class Client<CONNECTION : Connection>(
// is valid when there is a connection to the server, otherwise it is null
private var connection0: CONNECTION? = null
private val previousClosedConnectionActivity: Long = 0
// This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other
@ -120,7 +128,7 @@ open class Client<CONNECTION : Connection>(
* ### Case does not matter, and "localhost" is the default.
*
* @param remoteAddress The network host or ip address
* @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?).
*
* @throws IllegalArgumentException if the remote address is invalid
@ -129,18 +137,18 @@ open class Client<CONNECTION : Connection>(
*/
@Suppress("BlockingMethodInNonBlockingContext")
fun connect(remoteAddress: String = "",
connectionTimeoutMS: Long = 30_000L,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true) {
when {
// this is default IPC settings
remoteAddress.isEmpty() && config.enableIpc == true -> {
connectIpc(connectionTimeoutMS = connectionTimeoutMS)
connectIpc(connectionTimeoutSec = connectionTimeoutSec)
}
IPv4.isPreferred -> {
connect(
remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
@ -148,7 +156,7 @@ open class Client<CONNECTION : Connection>(
IPv6.isPreferred -> {
connect(
remoteAddress = Inet6.toAddress(remoteAddress),
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
@ -157,7 +165,7 @@ open class Client<CONNECTION : Connection>(
else -> {
connect(
remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
@ -182,7 +190,7 @@ open class Client<CONNECTION : Connection>(
* ### Case does not matter, and "localhost" is the default.
*
* @param remoteAddress The network or if localhost, IPC address for the client to connect to
* @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?).
*
* @throws IllegalArgumentException if the remote address is invalid
@ -190,14 +198,14 @@ open class Client<CONNECTION : Connection>(
* @throws ClientRejectedException if the client connection is rejected
*/
fun connect(remoteAddress: InetAddress,
connectionTimeoutMS: Long = 30_000L,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true) {
// Default IPC ports are flipped because they are in the perspective of the SERVER
connect(remoteAddress = remoteAddress,
ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
}
@ -206,7 +214,7 @@ open class Client<CONNECTION : Connection>(
*
* @param ipcPublicationId The IPC publication address for the client to connect to
* @param ipcSubscriptionId The IPC subscription address for the client to connect to
* @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely.
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely.
*
* @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time
@ -215,7 +223,7 @@ open class Client<CONNECTION : Connection>(
@Suppress("DuplicatedCode")
fun connectIpc(ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L) {
connectionTimeoutSec: Int = 30) {
// Default IPC ports are flipped because they are in the perspective of the SERVER
require(ipcPublicationId != ipcSubscriptionId) { "IPC publication and subscription ports cannot be the same! The must match the server's configuration." }
@ -223,7 +231,7 @@ open class Client<CONNECTION : Connection>(
connect(remoteAddress = null, // required!
ipcPublicationId = ipcPublicationId,
ipcSubscriptionId = ipcSubscriptionId,
connectionTimeoutMS = connectionTimeoutMS)
connectionTimeoutSec = connectionTimeoutSec)
}
/**
@ -248,7 +256,7 @@ open class Client<CONNECTION : Connection>(
* @param remoteAddress The network or if localhost, IPC address for the client to connect to
* @param ipcPublicationId The IPC publication address for the client to connect to
* @param ipcSubscriptionId The IPC subscription address for the client to connect to
* @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely.
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely.
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?).
*
* @throws IllegalArgumentException if the remote address is invalid
@ -261,9 +269,9 @@ open class Client<CONNECTION : Connection>(
// Default IPC ports are flipped because they are in the perspective of the SERVER
ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true) {
require(connectionTimeoutMS >= 0) { "connectionTimeoutMS '$connectionTimeoutMS' is invalid. It must be >=0" }
require(connectionTimeoutSec >= 0) { "connectionTimeoutMS '$connectionTimeoutSec' is invalid. It must be >=0" }
if (isConnected) {
logger.error("Unable to connect when already connected!")
@ -336,7 +344,7 @@ open class Client<CONNECTION : Connection>(
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
@ -351,7 +359,7 @@ open class Client<CONNECTION : Connection>(
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
@ -367,7 +375,7 @@ open class Client<CONNECTION : Connection>(
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
val connectionInfo = try {
handshake.handshakeHello(handshakeConnection, connectionTimeoutMS)
handshake.handshakeHello(handshakeConnection, connectionTimeoutSec)
} catch (e: Exception) {
logger.error("Handshake error", e)
throw e
@ -410,7 +418,7 @@ open class Client<CONNECTION : Connection>(
subscriptionPort = connectionInfo.publicationPort,
streamId = connectionInfo.streamId,
sessionId = connectionInfo.sessionId,
connectionTimeoutMS = connectionTimeoutMS,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = handshakeConnection.isReliable)
}
@ -498,7 +506,7 @@ open class Client<CONNECTION : Connection>(
// tell the server our connection handshake is done, and the connection can now listen for data.
// also closes the handshake (will also throw connect timeout exception)
val canFinishConnecting = try {
handshake.handshakeDone(handshakeConnection, connectionTimeoutMS)
handshake.handshakeDone(handshakeConnection, connectionTimeoutSec)
} catch (e: ClientException) {
logger.error("Error during handshake", e)
false

View File

@ -15,7 +15,11 @@
*/
package dorkbox.network
import dorkbox.netUtil.*
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.netUtil.Inet4
import dorkbox.netUtil.Inet6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.IpcMediaDriverConnection
@ -38,8 +42,7 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer
import java.net.InetAddress
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
/**
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
@ -223,7 +226,7 @@ open class Server<CONNECTION : Connection>(
subscriptionPort = config.subscriptionPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds)
driver.buildServer(aeronDriver, logger)
@ -311,7 +314,7 @@ open class Server<CONNECTION : Connection>(
subscriptionPort = config.subscriptionPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds)
driver.buildServer(aeronDriver, logger)
@ -398,7 +401,7 @@ open class Server<CONNECTION : Connection>(
subscriptionPort = config.subscriptionPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds)
driver.buildServer(aeronDriver, logger)

View File

@ -20,6 +20,7 @@ import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import mu.KLogger
import java.lang.Thread.sleep
import java.util.concurrent.*
/**
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
@ -61,25 +62,25 @@ internal open class IpcMediaDriverConnection(streamId: Int,
logger.trace("IPC server sub URI: ${subscriptionUri.build()}")
}
var success = false
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
var startTime = System.currentTimeMillis()
var success = false
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription)
// this will wait for the server to acknowledge the connection (all via aeron)
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected && subscription.imageCount() > 0) {
success = true
break
}
sleep(100L)
sleep(500L)
}
@ -93,13 +94,13 @@ internal open class IpcMediaDriverConnection(streamId: Int,
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) {
success = true
break
}
sleep(100L)
sleep(500L)
}
if (!success) {

View File

@ -17,7 +17,6 @@
package dorkbox.network.aeron
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Publication
import io.aeron.Subscription
import mu.KLogger
@ -25,7 +24,7 @@ import mu.KLogger
abstract class MediaDriverConnection(
val publicationPort: Int, val subscriptionPort: Int,
val streamId: Int, val sessionId: Int,
val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable {
val connectionTimeoutSec: Int, val isReliable: Boolean) : AutoCloseable {
lateinit var subscription: Subscription
lateinit var publication: Publication

View File

@ -25,7 +25,7 @@ import mu.KLogger
import java.lang.Thread.sleep
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
/**
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
@ -36,9 +36,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
connectionTimeoutSec: Int = 0,
isReliable: Boolean = true) :
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
var success: Boolean = false
@ -110,7 +110,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS)
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected) {
@ -118,7 +118,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
break
}
sleep(100L)
sleep(500L)
}
if (!success) {
@ -139,7 +139,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
break
}
sleep(100L)
sleep(500L)
}
if (!success) {

View File

@ -18,8 +18,8 @@ package dorkbox.network.aeron
abstract class UdpMediaDriverConnection(publicationPort: Int, subscriptionPort: Int,
streamId: Int, sessionId: Int,
connectionTimeoutMS: Long, isReliable: Boolean) :
connectionTimeoutSec: Int, isReliable: Boolean) :
MediaDriverConnection(publicationPort, subscriptionPort,
streamId, sessionId,
connectionTimeoutMS, isReliable) {
connectionTimeoutSec, isReliable) {
}

View File

@ -29,9 +29,9 @@ internal class UdpMediaDriverPairedConnection(listenAddress: InetAddress,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
connectionTimeoutSec: Int = 0,
isReliable: Boolean = true) :
UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
override fun toString(): String {
return "$remoteAddressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"

View File

@ -34,9 +34,9 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
connectionTimeoutSec: Int = 0,
isReliable: Boolean = true) :
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
var success: Boolean = false

View File

@ -26,6 +26,7 @@ import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import mu.KLogger
import org.agrona.DirectBuffer
import java.util.concurrent.*
internal class ClientHandshake<CONNECTION: Connection>(
private val crypto: CryptoManagement,
@ -79,7 +80,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
return@FragmentAssembler
}
// this is an retry message
// this is a retry message
// this can happen if there are multiple connections from the SAME ip address (ie: localhost)
if (message.state == HandshakeMessage.RETRY) {
needToRetry = true
@ -143,7 +144,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long) : ClientConnectionInfo {
fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
failed = false
oneTimeKey = endPoint.crypto.secureRandom.nextInt()
val publicKey = endPoint.storage.getPublicKey()!!
@ -163,8 +164,9 @@ internal class ClientHandshake<CONNECTION: Connection>(
// block until we receive the connection information from the server
var pollCount: Int
val startTime = System.currentTimeMillis()
while (connectionTimeoutMS == 0L || System.currentTimeMillis() - startTime < connectionTimeoutMS) {
val startTime = System.nanoTime()
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1)
@ -192,7 +194,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
fun handshakeDone(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long): Boolean {
fun handshakeDone(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int): Boolean {
val registrationMessage = HandshakeMessage.doneFromClient(oneTimeKey)
// Send the done message to the server.
@ -210,8 +212,9 @@ internal class ClientHandshake<CONNECTION: Connection>(
val subscription = handshakeConnection.subscription
val pollIdleStrategy = endPoint.pollIdleStrategyHandShake
var startTime = System.currentTimeMillis()
while (connectionTimeoutMS == 0L || System.currentTimeMillis() - startTime < connectionTimeoutMS) {
val timoutInNanos = TimeUnit.SECONDS.toMillis(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1)
@ -224,7 +227,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
needToRetry = false
// start over with the timeout!
startTime = System.currentTimeMillis()
startTime = System.nanoTime()
}
// 0 means we idle. >0 means reset and don't idle (because there are likely more)