Connection handshake timeouts are more standardized to nanoseconds
This commit is contained in:
parent
23d4ea4609
commit
2b4ba1347e
|
@ -517,19 +517,19 @@ open class Client<CONNECTION : Connection>(
|
|||
(config.enableIpc && (remoteAddress == null || isSelfMachine)) || (!config.enableIpc && remoteAddress == null)
|
||||
|
||||
// how long does the initial handshake take to connect
|
||||
var handshakeTimeoutSec = 5
|
||||
var handshakeTimeoutNs = aeronDriver.publicationConnectionTimeoutNs() + aeronDriver.lingerNs()
|
||||
// how long before we COMPLETELY give up retrying. A '0' means try forever.
|
||||
var connectionTimoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
|
||||
var connectionTimoutInNs = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
|
||||
|
||||
if (DEBUG_CONNECTIONS) {
|
||||
// connections are extremely difficult to diagnose when the connection timeout is short
|
||||
connectionTimoutInNanos = TimeUnit.HOURS.toNanos(1).toLong()
|
||||
handshakeTimeoutSec = TimeUnit.HOURS.toSeconds(1).toInt()
|
||||
connectionTimoutInNs = TimeUnit.HOURS.toNanos(1)
|
||||
handshakeTimeoutNs = TimeUnit.HOURS.toNanos(1)
|
||||
}
|
||||
|
||||
val startTime = System.nanoTime()
|
||||
var success = false
|
||||
while (connectionTimoutInNanos == 0L || System.nanoTime() - startTime < connectionTimoutInNanos) {
|
||||
while (connectionTimoutInNs == 0L || System.nanoTime() - startTime < connectionTimoutInNs) {
|
||||
if (isShutdown()) {
|
||||
resetOnError()
|
||||
|
||||
|
@ -570,7 +570,7 @@ open class Client<CONNECTION : Connection>(
|
|||
remoteAddressString = remoteAddressString,
|
||||
remotePort = port,
|
||||
port = config.port,
|
||||
handshakeTimeoutSec = handshakeTimeoutSec,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
reliable = reliable,
|
||||
logger = logger
|
||||
)
|
||||
|
@ -584,7 +584,7 @@ open class Client<CONNECTION : Connection>(
|
|||
logger.info { "Creating new handshake to $logInfo" }
|
||||
}
|
||||
|
||||
connect0(handshake, handshakeConnection, handshakeTimeoutSec)
|
||||
connect0(handshake, handshakeConnection, handshakeTimeoutNs)
|
||||
success = true
|
||||
slowDownForException = false
|
||||
|
||||
|
@ -636,7 +636,7 @@ open class Client<CONNECTION : Connection>(
|
|||
if (!success) {
|
||||
endpointIsRunning.lazySet(false)
|
||||
|
||||
if (System.nanoTime() - startTime < connectionTimoutInNanos) {
|
||||
if (System.nanoTime() - startTime < connectionTimoutInNs) {
|
||||
|
||||
val type = if (connection0 == null) {
|
||||
"UNKNOWN"
|
||||
|
@ -664,12 +664,15 @@ open class Client<CONNECTION : Connection>(
|
|||
|
||||
|
||||
// the handshake process might have to restart this connection process.
|
||||
private suspend fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: ClientHandshakeDriver, connectionTimeoutSec: Int) {
|
||||
private suspend fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: ClientHandshakeDriver, handshakeTimeoutNs: Long) {
|
||||
// this will block until the connection timeout, and throw an exception if we were unable to connect with the server
|
||||
|
||||
|
||||
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
|
||||
val connectionInfo = handshake.hello(handshakeConnection, connectionTimeoutSec)
|
||||
val connectionInfo = handshake.hello(
|
||||
handshakeConnection = handshakeConnection,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs
|
||||
)
|
||||
|
||||
// VALIDATE:: check to see if the remote connection's public key has changed!
|
||||
val validateRemoteAddress = if (handshakeConnection.pubSub.isIpc) {
|
||||
|
@ -723,7 +726,12 @@ open class Client<CONNECTION : Connection>(
|
|||
|
||||
|
||||
// we are now connected, so we can connect to the NEW client-specific ports
|
||||
val clientConnection = ClientConnectionDriver.build(aeronDriver, connectionTimeoutSec, handshakeConnection, connectionInfo)
|
||||
val clientConnection = ClientConnectionDriver.build(
|
||||
aeronDriver = aeronDriver,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
handshakeConnection = handshakeConnection,
|
||||
connectionInfo = connectionInfo
|
||||
)
|
||||
|
||||
val pubSub = clientConnection.connectionInfo
|
||||
val logInfo = pubSub.getLogInfo(logger.isDebugEnabled)
|
||||
|
@ -755,7 +763,10 @@ open class Client<CONNECTION : Connection>(
|
|||
// also closes the handshake (will also throw connect timeout exception)
|
||||
|
||||
try {
|
||||
handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details)
|
||||
handshake.done(handshakeConnection, clientConnection,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
aeronLogInfo = handshakeConnection.details
|
||||
)
|
||||
} catch (e: Exception) {
|
||||
listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] error during handshake", e))
|
||||
throw e
|
||||
|
|
|
@ -25,6 +25,7 @@ import dorkbox.network.connection.EndPoint
|
|||
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
|
||||
import dorkbox.network.exceptions.AllocationException
|
||||
import dorkbox.network.handshake.RandomId65kAllocator
|
||||
import dorkbox.util.Sys
|
||||
import io.aeron.*
|
||||
import io.aeron.driver.reports.LossReportReader
|
||||
import io.aeron.driver.reports.LossReportUtil
|
||||
|
@ -45,7 +46,6 @@ import org.agrona.concurrent.ringbuffer.RingBufferDescriptor
|
|||
import org.agrona.concurrent.status.CountersReader
|
||||
import org.slf4j.Logger
|
||||
import java.io.File
|
||||
import java.util.concurrent.*
|
||||
|
||||
fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, port: Int): ChannelUriStringBuilder {
|
||||
this.endpoint(AeronDriver.address(isIpv4, addressString, port))
|
||||
|
@ -458,7 +458,7 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
|
|||
*/
|
||||
suspend fun waitForConnection(
|
||||
publication: Publication,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
logInfo: String,
|
||||
onErrorHandler: suspend (Throwable) -> Exception
|
||||
) {
|
||||
|
@ -466,10 +466,9 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
|
|||
return
|
||||
}
|
||||
|
||||
val timeoutInNanos = TimeUnit.SECONDS.toNanos(handshakeTimeoutSec.toLong())
|
||||
val startTime = System.nanoTime()
|
||||
|
||||
while (System.nanoTime() - startTime < timeoutInNanos) {
|
||||
while (System.nanoTime() - startTime < handshakeTimeoutNs) {
|
||||
if (publication.isConnected) {
|
||||
return
|
||||
}
|
||||
|
@ -479,7 +478,7 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
|
|||
|
||||
close(publication, logInfo)
|
||||
|
||||
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in $handshakeTimeoutSec seconds while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}"))
|
||||
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}"))
|
||||
exception.cleanAllStackTrace()
|
||||
throw exception
|
||||
}
|
||||
|
@ -646,6 +645,13 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
|
|||
*/
|
||||
fun lingerNs(): Long = internal.lingerNs()
|
||||
|
||||
/**
|
||||
* @return Time in nanoseconds a publication will be considered not connected if no status messages are received.
|
||||
*/
|
||||
fun publicationConnectionTimeoutNs(): Long {
|
||||
return internal.publicationConnectionTimeoutNs()
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that we DO NOT approach the Aeron linger timeout!
|
||||
*/
|
||||
|
|
|
@ -936,6 +936,13 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|||
return context.context.publicationLingerTimeoutNs()
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Time in nanoseconds a publication will be considered not connected if no status messages are received.
|
||||
*/
|
||||
fun publicationConnectionTimeoutNs(): Long {
|
||||
return context.context.publicationConnectionTimeoutNs()
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that we DO NOT approach the Aeron linger timeout!
|
||||
*/
|
||||
|
@ -965,4 +972,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|||
override fun toString(): String {
|
||||
return "Aeron Driver [${driverId}]"
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
companion object {
|
||||
suspend fun build(
|
||||
aeronDriver: AeronDriver,
|
||||
connectionTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
handshakeConnection: ClientHandshakeDriver,
|
||||
connectionInfo: ClientConnectionInfo
|
||||
): ClientConnectionDriver {
|
||||
|
@ -66,7 +66,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
|
||||
pubSub = buildIPC(
|
||||
aeronDriver = aeronDriver,
|
||||
handshakeTimeoutSec = connectionTimeoutSec,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
sessionIdPub = sessionIdPub,
|
||||
sessionIdSub = sessionIdSub,
|
||||
streamIdPub = streamIdPub,
|
||||
|
@ -89,7 +89,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
|
||||
pubSub = buildUDP(
|
||||
aeronDriver = aeronDriver,
|
||||
handshakeTimeoutSec = connectionTimeoutSec,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
sessionIdPub = sessionIdPub,
|
||||
sessionIdSub = sessionIdSub,
|
||||
streamIdPub = streamIdPub,
|
||||
|
@ -109,7 +109,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
@Throws(ClientTimedOutException::class)
|
||||
private suspend fun buildIPC(
|
||||
aeronDriver: AeronDriver,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
sessionIdPub: Int,
|
||||
sessionIdSub: Int,
|
||||
streamIdPub: Int,
|
||||
|
@ -132,7 +132,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutSec, logInfo) { cause ->
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server!", cause)
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
@Throws(ClientTimedOutException::class)
|
||||
private suspend fun buildUDP(
|
||||
aeronDriver: AeronDriver,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
sessionIdPub: Int,
|
||||
sessionIdSub: Int,
|
||||
streamIdPub: Int,
|
||||
|
@ -180,7 +180,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutSec, logInfo) { cause ->
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server $remoteAddressString", cause)
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import dorkbox.network.connection.CryptoManagement
|
|||
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
|
||||
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
|
||||
import dorkbox.network.exceptions.*
|
||||
import dorkbox.util.Sys
|
||||
import io.aeron.FragmentAssembler
|
||||
import io.aeron.Image
|
||||
import io.aeron.logbuffer.FragmentHandler
|
||||
|
@ -28,7 +29,6 @@ import io.aeron.logbuffer.Header
|
|||
import kotlinx.coroutines.delay
|
||||
import mu.KLogger
|
||||
import org.agrona.DirectBuffer
|
||||
import java.util.concurrent.*
|
||||
|
||||
internal class ClientHandshake<CONNECTION: Connection>(
|
||||
private val client: Client<CONNECTION>,
|
||||
|
@ -177,7 +177,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
|
||||
// called from the connect thread
|
||||
// when exceptions are thrown, the handshake pub/sub will be closed
|
||||
suspend fun hello(handshakeConnection: ClientHandshakeDriver, handshakeTimeoutSec: Int) : ClientConnectionInfo {
|
||||
suspend fun hello(handshakeConnection: ClientHandshakeDriver, handshakeTimeoutNs: Long) : ClientConnectionInfo {
|
||||
val pubSub = handshakeConnection.pubSub
|
||||
|
||||
// is our pub still connected??
|
||||
|
@ -205,9 +205,8 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
|
||||
// block until we receive the connection information from the server
|
||||
|
||||
val timoutInNanos = TimeUnit.SECONDS.toNanos(handshakeTimeoutSec.toLong()) + client.aeronDriver.lingerNs()
|
||||
val startTime = System.nanoTime()
|
||||
while (System.nanoTime() - startTime < timoutInNanos) {
|
||||
while (System.nanoTime() - startTime < handshakeTimeoutNs) {
|
||||
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
|
||||
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
|
||||
pubSub.sub.poll(handler, 1)
|
||||
|
@ -230,8 +229,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
if (connectionHelloInfo == null) {
|
||||
handshakeConnection.close()
|
||||
|
||||
val timeout = TimeUnit.NANOSECONDS.toSeconds(client.aeronDriver.lingerNs()) + handshakeTimeoutSec
|
||||
val exception = ClientTimedOutException("$handshakeConnection Waiting for registration response from server for more than $timeout seconds")
|
||||
val exception = ClientTimedOutException("$handshakeConnection Waiting for registration response from server for more than ${Sys.getTimePrettyFull(handshakeTimeoutNs)}")
|
||||
throw exception
|
||||
}
|
||||
|
||||
|
@ -243,7 +241,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
suspend fun done(
|
||||
handshakeConnection: ClientHandshakeDriver,
|
||||
clientConnection: ClientConnectionDriver,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
aeronLogInfo: String
|
||||
) {
|
||||
val pubSub = clientConnection.connectionInfo
|
||||
|
@ -273,9 +271,8 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
connectionDone = false
|
||||
|
||||
// block until we receive the connection information from the server
|
||||
val timoutInNanos = TimeUnit.SECONDS.toNanos(handshakeTimeoutSec.toLong())
|
||||
var startTime = System.nanoTime()
|
||||
while (System.nanoTime() - startTime < timoutInNanos) {
|
||||
while (System.nanoTime() - startTime < handshakeTimeoutNs) {
|
||||
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
|
||||
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
|
||||
handshakePubSub.sub.poll(handler, 1)
|
||||
|
@ -305,7 +302,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
|||
// since this failed, close everything
|
||||
handshakeConnection.close()
|
||||
|
||||
val exception = ClientTimedOutException("Timed out waiting for registration response from server: $handshakeTimeoutSec seconds")
|
||||
val exception = ClientTimedOutException("Timed out waiting for registration response from server: ${Sys.getTimePrettyFull(handshakeTimeoutNs)}")
|
||||
throw exception
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInter
|
|||
import dorkbox.network.exceptions.ClientException
|
||||
import dorkbox.network.exceptions.ClientRetryException
|
||||
import dorkbox.network.exceptions.ClientTimedOutException
|
||||
import dorkbox.util.Sys
|
||||
import io.aeron.CommonContext
|
||||
import io.aeron.Subscription
|
||||
import mu.KLogger
|
||||
|
@ -61,7 +62,7 @@ internal class ClientHandshakeDriver(
|
|||
remoteAddressString: String,
|
||||
remotePort: Int,
|
||||
port: Int,
|
||||
handshakeTimeoutSec: Int = 10,
|
||||
handshakeTimeoutNs: Long,
|
||||
reliable: Boolean,
|
||||
logger: KLogger
|
||||
): ClientHandshakeDriver {
|
||||
|
@ -104,7 +105,7 @@ internal class ClientHandshakeDriver(
|
|||
try {
|
||||
pubSub = buildIPC(
|
||||
aeronDriver = aeronDriver,
|
||||
handshakeTimeoutSec = handshakeTimeoutSec,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
sessionIdPub = sessionIdPub,
|
||||
streamIdPub = streamIdPub,
|
||||
streamIdSub = streamIdSub,
|
||||
|
@ -148,7 +149,7 @@ internal class ClientHandshakeDriver(
|
|||
|
||||
pubSub = buildUDP(
|
||||
aeronDriver = aeronDriver,
|
||||
handshakeTimeoutSec = handshakeTimeoutSec,
|
||||
handshakeTimeoutNs = handshakeTimeoutNs,
|
||||
remoteAddress = remoteAddress,
|
||||
remoteAddressString = remoteAddressString,
|
||||
portPub = remotePort,
|
||||
|
@ -181,7 +182,7 @@ internal class ClientHandshakeDriver(
|
|||
@Throws(ClientTimedOutException::class)
|
||||
private suspend fun buildIPC(
|
||||
aeronDriver: AeronDriver,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
sessionIdPub: Int,
|
||||
streamIdPub: Int, streamIdSub: Int,
|
||||
reliable: Boolean,
|
||||
|
@ -203,8 +204,8 @@ internal class ClientHandshakeDriver(
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutSec, logInfo) { cause ->
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server!", cause)
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause)
|
||||
}
|
||||
|
||||
// Create a subscription at the given address and port, using the given stream ID.
|
||||
|
@ -220,7 +221,7 @@ internal class ClientHandshakeDriver(
|
|||
@Throws(ClientTimedOutException::class)
|
||||
private suspend fun buildUDP(
|
||||
aeronDriver: AeronDriver,
|
||||
handshakeTimeoutSec: Int,
|
||||
handshakeTimeoutNs: Long,
|
||||
remoteAddress: InetAddress,
|
||||
remoteAddressString: String,
|
||||
portPub: Int,
|
||||
|
@ -257,9 +258,9 @@ internal class ClientHandshakeDriver(
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutSec, logInfo) { cause ->
|
||||
aeronDriver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
streamIdAllocator.free(streamIdSub) // we don't continue, so close this as well
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server!", cause)
|
||||
ClientTimedOutException("$logInfo publication cannot connect with server in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -48,6 +48,8 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
val aeronDriver: AeronDriver
|
||||
) {
|
||||
|
||||
|
||||
|
||||
// note: the expire time here is a LITTLE longer than the expire time in the client, this way we can adjust for network lag if it's close
|
||||
private val pendingConnections = ExpiringMap.builder()
|
||||
.apply {
|
||||
|
@ -71,6 +73,23 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
|
||||
internal val connectionsPerIpCounts = ConnectionCounts()
|
||||
|
||||
/**
|
||||
* how long does the initial handshake take to connect
|
||||
*/
|
||||
internal var handshakeTimeoutNs: Long
|
||||
|
||||
init {
|
||||
// we MUST include the publication linger timeout, otherwise we might encounter problems that are NOT REALLY problems
|
||||
var handshakeTimeoutNs = aeronDriver.publicationConnectionTimeoutNs() + aeronDriver.lingerNs()
|
||||
|
||||
if (EndPoint.DEBUG_CONNECTIONS) {
|
||||
// connections are extremely difficult to diagnose when the connection timeout is short
|
||||
handshakeTimeoutNs = TimeUnit.HOURS.toNanos(1)
|
||||
}
|
||||
|
||||
this.handshakeTimeoutNs = handshakeTimeoutNs
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should continue parsing the incoming message, false if we should abort (as we are DONE processing data)
|
||||
*/
|
||||
|
|
|
@ -25,11 +25,15 @@ import dorkbox.network.ServerConfiguration
|
|||
import dorkbox.network.aeron.AeronDriver
|
||||
import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake
|
||||
import dorkbox.network.aeron.AeronPoller
|
||||
import dorkbox.network.connection.*
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.connection.ConnectionParams
|
||||
import dorkbox.network.connection.EventDispatcher
|
||||
import dorkbox.network.connection.IpInfo
|
||||
import dorkbox.network.exceptions.ServerException
|
||||
import dorkbox.network.exceptions.ServerHandshakeException
|
||||
import dorkbox.network.exceptions.ServerTimedoutException
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import dorkbox.util.Sys
|
||||
import io.aeron.CommonContext
|
||||
import io.aeron.FragmentAssembler
|
||||
import io.aeron.Image
|
||||
|
@ -61,18 +65,14 @@ internal object ServerHandshakePollers {
|
|||
val connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION
|
||||
): FragmentHandler {
|
||||
|
||||
private val connectionTimeoutSec = server.config.connectionCloseTimeoutInSeconds
|
||||
private val isReliable = server.config.isReliable
|
||||
private val handshaker = server.handshaker
|
||||
private val handshakeTimeoutNs = handshake.handshakeTimeoutNs
|
||||
|
||||
// note: the expire time here is a LITTLE longer than the expire time in the client, this way we can adjust for network lag if it's close
|
||||
private val publications = ExpiringMap.builder()
|
||||
.apply {
|
||||
// connections are extremely difficult to diagnose when the connection timeout is short
|
||||
val timeUnit = if (EndPoint.DEBUG_CONNECTIONS) { TimeUnit.HOURS } else { TimeUnit.NANOSECONDS }
|
||||
|
||||
// we MUST include the publication linger timeout, otherwise we might encounter problems that are NOT REALLY problems
|
||||
this.expiration(TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong() * 2) + driver.lingerNs(), timeUnit)
|
||||
this.expiration(handshakeTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
}
|
||||
.expirationPolicy(ExpirationPolicy.CREATED)
|
||||
.expirationListener<Long, Publication> { connectKey, publication ->
|
||||
|
@ -146,8 +146,8 @@ internal object ServerHandshakePollers {
|
|||
|
||||
try {
|
||||
// we actually have to wait for it to connect before we continue
|
||||
driver.waitForConnection(publication, connectionTimeoutSec, logInfo) { cause ->
|
||||
ServerTimedoutException("$logInfo publication cannot connect with client!", cause)
|
||||
driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
|
||||
|
@ -245,16 +245,13 @@ internal object ServerHandshakePollers {
|
|||
|
||||
private val ipInfo = server.ipInfo
|
||||
private val handshaker = server.handshaker
|
||||
private val connectionTimeoutSec = server.config.connectionCloseTimeoutInSeconds
|
||||
private val handshakeTimeoutNs = handshake.handshakeTimeoutNs
|
||||
|
||||
// note: the expire time here is a LITTLE longer than the expire time in the client, this way we can adjust for network lag if it's close
|
||||
private val publications = ExpiringMap.builder()
|
||||
.apply {
|
||||
// connections are extremely difficult to diagnose when the connection timeout is short
|
||||
val timeUnit = if (EndPoint.DEBUG_CONNECTIONS) { TimeUnit.HOURS } else { TimeUnit.NANOSECONDS }
|
||||
|
||||
// we MUST include the publication linger timeout, otherwise we might encounter problems that are NOT REALLY problems
|
||||
this.expiration(TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong() * 2) + driver.lingerNs(), timeUnit)
|
||||
this.expiration(handshakeTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
}
|
||||
.expirationPolicy(ExpirationPolicy.CREATED)
|
||||
.expirationListener<Long, Publication> { connectKey, publication ->
|
||||
|
@ -373,8 +370,8 @@ internal object ServerHandshakePollers {
|
|||
|
||||
try {
|
||||
// we actually have to wait for it to connect before we continue
|
||||
driver.waitForConnection(publication, connectionTimeoutSec, logInfo) { cause ->
|
||||
ServerTimedoutException("$logInfo publication cannot connect with client!", cause)
|
||||
driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause ->
|
||||
ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
|
||||
|
|
|
@ -25,6 +25,7 @@ import kotlinx.coroutines.runBlocking
|
|||
import mu.KotlinLogging
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.*
|
||||
|
||||
class AeronPubSubTest : BaseTest() {
|
||||
@Test
|
||||
|
@ -36,7 +37,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
val totalCount = 40
|
||||
val port = 3535
|
||||
val serverStreamId = 55555
|
||||
val handshakeTimeoutSec = 10
|
||||
val handshakeTimeoutNs = TimeUnit.SECONDS.toNanos(10)
|
||||
|
||||
|
||||
|
||||
|
@ -81,7 +82,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
clientDriver.waitForConnection(publication, handshakeTimeoutSec, "client_$index") { cause ->
|
||||
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
|
||||
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
|
||||
}
|
||||
|
||||
|
@ -121,7 +122,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
val totalCount = 40
|
||||
val port = 3535
|
||||
val serverStreamId = 55555
|
||||
val handshakeTimeoutSec = 10
|
||||
val handshakeTimeoutNs = TimeUnit.SECONDS.toNanos(10)
|
||||
|
||||
|
||||
|
||||
|
@ -168,7 +169,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
|
||||
// can throw an exception! We catch it in the calling class
|
||||
// we actually have to wait for it to connect before we continue
|
||||
clientDriver.waitForConnection(publication, handshakeTimeoutSec, "client_$index") { cause ->
|
||||
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
|
||||
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue