diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 7f27559a..c7e427ac 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -16,11 +16,20 @@ package dorkbox.network import dorkbox.bytes.toHexString -import dorkbox.netUtil.* +import dorkbox.netUtil.IP +import dorkbox.netUtil.IPv4 +import dorkbox.netUtil.IPv6 +import dorkbox.netUtil.Inet4 +import dorkbox.netUtil.Inet6 import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverClientConnection -import dorkbox.network.connection.* +import dorkbox.network.connection.Connection +import dorkbox.network.connection.ConnectionParams +import dorkbox.network.connection.EndPoint +import dorkbox.network.connection.ListenerManager +import dorkbox.network.connection.PublicKeyValidationState +import dorkbox.network.connection.eventLoop import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientRejectedException @@ -90,7 +99,6 @@ open class Client( // is valid when there is a connection to the server, otherwise it is null private var connection0: CONNECTION? = null - private val previousClosedConnectionActivity: Long = 0 // This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper // lock-stop ordering for how disconnect and connect work with each-other @@ -120,7 +128,7 @@ open class Client( * ### Case does not matter, and "localhost" is the default. * * @param remoteAddress The network host or ip address - * @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely + * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely * @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?). * * @throws IllegalArgumentException if the remote address is invalid @@ -129,18 +137,18 @@ open class Client( */ @Suppress("BlockingMethodInNonBlockingContext") fun connect(remoteAddress: String = "", - connectionTimeoutMS: Long = 30_000L, + connectionTimeoutSec: Int = 30, reliable: Boolean = true) { when { // this is default IPC settings remoteAddress.isEmpty() && config.enableIpc == true -> { - connectIpc(connectionTimeoutMS = connectionTimeoutMS) + connectIpc(connectionTimeoutSec = connectionTimeoutSec) } IPv4.isPreferred -> { connect( remoteAddress = Inet4.toAddress(remoteAddress), - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, reliable = reliable ) } @@ -148,7 +156,7 @@ open class Client( IPv6.isPreferred -> { connect( remoteAddress = Inet6.toAddress(remoteAddress), - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, reliable = reliable ) } @@ -157,7 +165,7 @@ open class Client( else -> { connect( remoteAddress = Inet4.toAddress(remoteAddress), - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, reliable = reliable ) } @@ -182,7 +190,7 @@ open class Client( * ### Case does not matter, and "localhost" is the default. * * @param remoteAddress The network or if localhost, IPC address for the client to connect to - * @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely + * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely * @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?). * * @throws IllegalArgumentException if the remote address is invalid @@ -190,14 +198,14 @@ open class Client( * @throws ClientRejectedException if the client connection is rejected */ fun connect(remoteAddress: InetAddress, - connectionTimeoutMS: Long = 30_000L, + connectionTimeoutSec: Int = 30, reliable: Boolean = true) { // Default IPC ports are flipped because they are in the perspective of the SERVER connect(remoteAddress = remoteAddress, ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, reliable = reliable) } @@ -206,7 +214,7 @@ open class Client( * * @param ipcPublicationId The IPC publication address for the client to connect to * @param ipcSubscriptionId The IPC subscription address for the client to connect to - * @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely. + * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely. * * @throws IllegalArgumentException if the remote address is invalid * @throws ClientTimedOutException if the client is unable to connect in x amount of time @@ -215,7 +223,7 @@ open class Client( @Suppress("DuplicatedCode") fun connectIpc(ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, - connectionTimeoutMS: Long = 30_000L) { + connectionTimeoutSec: Int = 30) { // Default IPC ports are flipped because they are in the perspective of the SERVER require(ipcPublicationId != ipcSubscriptionId) { "IPC publication and subscription ports cannot be the same! The must match the server's configuration." } @@ -223,7 +231,7 @@ open class Client( connect(remoteAddress = null, // required! ipcPublicationId = ipcPublicationId, ipcSubscriptionId = ipcSubscriptionId, - connectionTimeoutMS = connectionTimeoutMS) + connectionTimeoutSec = connectionTimeoutSec) } /** @@ -248,7 +256,7 @@ open class Client( * @param remoteAddress The network or if localhost, IPC address for the client to connect to * @param ipcPublicationId The IPC publication address for the client to connect to * @param ipcSubscriptionId The IPC subscription address for the client to connect to - * @param connectionTimeoutMS wait for x milliseconds. 0 will wait indefinitely. + * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely. * @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?). * * @throws IllegalArgumentException if the remote address is invalid @@ -261,9 +269,9 @@ open class Client( // Default IPC ports are flipped because they are in the perspective of the SERVER ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, - connectionTimeoutMS: Long = 30_000L, + connectionTimeoutSec: Int = 30, reliable: Boolean = true) { - require(connectionTimeoutMS >= 0) { "connectionTimeoutMS '$connectionTimeoutMS' is invalid. It must be >=0" } + require(connectionTimeoutSec >= 0) { "connectionTimeoutMS '$connectionTimeoutSec' is invalid. It must be >=0" } if (isConnected) { logger.error("Unable to connect when already connected!") @@ -336,7 +344,7 @@ open class Client( subscriptionPort = config.publicationPort, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, isReliable = reliable) // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports @@ -351,7 +359,7 @@ open class Client( subscriptionPort = config.publicationPort, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, isReliable = reliable) // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports @@ -367,7 +375,7 @@ open class Client( // throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class) val connectionInfo = try { - handshake.handshakeHello(handshakeConnection, connectionTimeoutMS) + handshake.handshakeHello(handshakeConnection, connectionTimeoutSec) } catch (e: Exception) { logger.error("Handshake error", e) throw e @@ -410,7 +418,7 @@ open class Client( subscriptionPort = connectionInfo.publicationPort, streamId = connectionInfo.streamId, sessionId = connectionInfo.sessionId, - connectionTimeoutMS = connectionTimeoutMS, + connectionTimeoutSec = connectionTimeoutSec, isReliable = handshakeConnection.isReliable) } @@ -498,7 +506,7 @@ open class Client( // tell the server our connection handshake is done, and the connection can now listen for data. // also closes the handshake (will also throw connect timeout exception) val canFinishConnecting = try { - handshake.handshakeDone(handshakeConnection, connectionTimeoutMS) + handshake.handshakeDone(handshakeConnection, connectionTimeoutSec) } catch (e: ClientException) { logger.error("Error during handshake", e) false diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index a40c880f..65d6d27c 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -15,7 +15,11 @@ */ package dorkbox.network -import dorkbox.netUtil.* +import dorkbox.netUtil.IP +import dorkbox.netUtil.IPv4 +import dorkbox.netUtil.IPv6 +import dorkbox.netUtil.Inet4 +import dorkbox.netUtil.Inet6 import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.IpcMediaDriverConnection @@ -38,8 +42,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.agrona.DirectBuffer import java.net.InetAddress -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.TimeUnit +import java.util.concurrent.* /** * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the @@ -223,7 +226,7 @@ open class Server( subscriptionPort = config.subscriptionPort, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + connectionTimeoutSec = config.connectionCloseTimeoutInSeconds) driver.buildServer(aeronDriver, logger) @@ -311,7 +314,7 @@ open class Server( subscriptionPort = config.subscriptionPort, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + connectionTimeoutSec = config.connectionCloseTimeoutInSeconds) driver.buildServer(aeronDriver, logger) @@ -398,7 +401,7 @@ open class Server( subscriptionPort = config.subscriptionPort, streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + connectionTimeoutSec = config.connectionCloseTimeoutInSeconds) driver.buildServer(aeronDriver, logger) diff --git a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt index 2d9830d6..9dbae58d 100644 --- a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt @@ -20,6 +20,7 @@ import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.ChannelUriStringBuilder import mu.KLogger import java.lang.Thread.sleep +import java.util.concurrent.* /** * For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER @@ -61,25 +62,25 @@ internal open class IpcMediaDriverConnection(streamId: Int, logger.trace("IPC server sub URI: ${subscriptionUri.build()}") } + var success = false + // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // publication of any state to other threads and not be long running or re-entrant with the client. - var startTime = System.currentTimeMillis() - - var success = false - // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription) // this will wait for the server to acknowledge the connection (all via aeron) - while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { + val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + var startTime = System.nanoTime() + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { if (subscription.isConnected && subscription.imageCount() > 0) { success = true break } - sleep(100L) + sleep(500L) } @@ -93,13 +94,13 @@ internal open class IpcMediaDriverConnection(streamId: Int, // this will wait for the server to acknowledge the connection (all via aeron) startTime = System.currentTimeMillis() - while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { if (publication.isConnected) { success = true break } - sleep(100L) + sleep(500L) } if (!success) { diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index 8cfd5751..3b2eb354 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -17,7 +17,6 @@ package dorkbox.network.aeron -import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.Publication import io.aeron.Subscription import mu.KLogger @@ -25,7 +24,7 @@ import mu.KLogger abstract class MediaDriverConnection( val publicationPort: Int, val subscriptionPort: Int, val streamId: Int, val sessionId: Int, - val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable { + val connectionTimeoutSec: Int, val isReliable: Boolean) : AutoCloseable { lateinit var subscription: Subscription lateinit var publication: Publication diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index c1b1651f..5290f9b3 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -25,7 +25,7 @@ import mu.KLogger import java.lang.Thread.sleep import java.net.Inet4Address import java.net.InetAddress -import java.util.concurrent.TimeUnit +import java.util.concurrent.* /** * For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER. @@ -36,9 +36,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, subscriptionPort: Int, streamId: Int, sessionId: Int, - connectionTimeoutMS: Long = 0, + connectionTimeoutSec: Int = 0, isReliable: Boolean = true) : - UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { var success: Boolean = false @@ -110,7 +110,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, // this will wait for the server to acknowledge the connection (all via aeron) - val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS) + val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) var startTime = System.nanoTime() while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { if (subscription.isConnected) { @@ -118,7 +118,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, break } - sleep(100L) + sleep(500L) } if (!success) { @@ -139,7 +139,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, break } - sleep(100L) + sleep(500L) } if (!success) { diff --git a/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt index 0ae4c37d..f9ae8df0 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt @@ -18,8 +18,8 @@ package dorkbox.network.aeron abstract class UdpMediaDriverConnection(publicationPort: Int, subscriptionPort: Int, streamId: Int, sessionId: Int, - connectionTimeoutMS: Long, isReliable: Boolean) : + connectionTimeoutSec: Int, isReliable: Boolean) : MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, - connectionTimeoutMS, isReliable) { + connectionTimeoutSec, isReliable) { } diff --git a/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt index f2317487..35ffbff0 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt @@ -29,9 +29,9 @@ internal class UdpMediaDriverPairedConnection(listenAddress: InetAddress, subscriptionPort: Int, streamId: Int, sessionId: Int, - connectionTimeoutMS: Long = 0, + connectionTimeoutSec: Int = 0, isReliable: Boolean = true) : - UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { override fun toString(): String { return "$remoteAddressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" diff --git a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt index b1602863..8dd95d7c 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt @@ -34,9 +34,9 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres subscriptionPort: Int, streamId: Int, sessionId: Int, - connectionTimeoutMS: Long = 0, + connectionTimeoutSec: Int = 0, isReliable: Boolean = true) : - UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { var success: Boolean = false diff --git a/src/dorkbox/network/handshake/ClientHandshake.kt b/src/dorkbox/network/handshake/ClientHandshake.kt index 9904e4c8..5b256690 100644 --- a/src/dorkbox/network/handshake/ClientHandshake.kt +++ b/src/dorkbox/network/handshake/ClientHandshake.kt @@ -26,6 +26,7 @@ import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.Header import mu.KLogger import org.agrona.DirectBuffer +import java.util.concurrent.* internal class ClientHandshake( private val crypto: CryptoManagement, @@ -79,7 +80,7 @@ internal class ClientHandshake( return@FragmentAssembler } - // this is an retry message + // this is a retry message // this can happen if there are multiple connections from the SAME ip address (ie: localhost) if (message.state == HandshakeMessage.RETRY) { needToRetry = true @@ -143,7 +144,7 @@ internal class ClientHandshake( } // called from the connect thread - fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long) : ClientConnectionInfo { + fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo { failed = false oneTimeKey = endPoint.crypto.secureRandom.nextInt() val publicKey = endPoint.storage.getPublicKey()!! @@ -163,8 +164,9 @@ internal class ClientHandshake( // block until we receive the connection information from the server var pollCount: Int - val startTime = System.currentTimeMillis() - while (connectionTimeoutMS == 0L || System.currentTimeMillis() - startTime < connectionTimeoutMS) { + val startTime = System.nanoTime() + val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { // NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment. // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` pollCount = subscription.poll(handler, 1) @@ -192,7 +194,7 @@ internal class ClientHandshake( } // called from the connect thread - fun handshakeDone(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long): Boolean { + fun handshakeDone(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int): Boolean { val registrationMessage = HandshakeMessage.doneFromClient(oneTimeKey) // Send the done message to the server. @@ -210,8 +212,9 @@ internal class ClientHandshake( val subscription = handshakeConnection.subscription val pollIdleStrategy = endPoint.pollIdleStrategyHandShake - var startTime = System.currentTimeMillis() - while (connectionTimeoutMS == 0L || System.currentTimeMillis() - startTime < connectionTimeoutMS) { + val timoutInNanos = TimeUnit.SECONDS.toMillis(connectionTimeoutSec.toLong()) + var startTime = System.nanoTime() + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { // NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment. // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` pollCount = subscription.poll(handler, 1) @@ -224,7 +227,7 @@ internal class ClientHandshake( needToRetry = false // start over with the timeout! - startTime = System.currentTimeMillis() + startTime = System.nanoTime() } // 0 means we idle. >0 means reset and don't idle (because there are likely more)