Timeout is now in nanos, client connections time out differently now.

This commit is contained in:
Robinson 2022-03-24 00:32:58 +01:00
parent 5c854c4fab
commit b597bcd497
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
11 changed files with 179 additions and 114 deletions

View File

@ -23,6 +23,7 @@ import dorkbox.netUtil.Inet4
import dorkbox.netUtil.Inet6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.MediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
@ -38,11 +39,13 @@ import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.util.concurrent.*
/**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
@ -51,6 +54,7 @@ import java.net.InetAddress
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/
@Suppress("unused")
open class Client<CONNECTION : Connection>(
config: Configuration = Configuration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION = {
@ -141,32 +145,29 @@ open class Client<CONNECTION : Connection>(
reliable: Boolean = true) {
when {
// this is default IPC settings
remoteAddress.isEmpty() && config.enableIpc == true -> {
connectIpc(connectionTimeoutSec = connectionTimeoutSec)
remoteAddress.isEmpty() && config.enableIpc -> {
connectIpc(connectionTimeoutSec)
}
IPv4.isPreferred -> {
connect(
remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
connect(remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
IPv6.isPreferred -> {
connect(
remoteAddress = Inet6.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
connect(remoteAddress = Inet6.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
// if there is no preference, then try to connect via IPv4
else -> {
connect(
remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
connect(remoteAddress = Inet4.toAddress(remoteAddress),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable
)
}
}
@ -178,7 +179,7 @@ open class Client<CONNECTION : Connection>(
* Default connection is to localhost
*
* ### For a network address, it can be:
* - a network name ("localhost", "loopback", "lo", "bob.example.org")
* - a network name ("localhost", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1")
* - an InetAddress address
*
@ -251,8 +252,6 @@ open class Client<CONNECTION : Connection>(
*
* ### Case does not matter, and "localhost" is the default.
*
* ### Case does not matter, and "localhost" is the default.
*
* @param remoteAddress The network or if localhost, IPC address for the client to connect to
* @param ipcPublicationId The IPC publication address for the client to connect to
* @param ipcSubscriptionId The IPC subscription address for the client to connect to
@ -284,7 +283,9 @@ open class Client<CONNECTION : Connection>(
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
try {
initEndpointState()
runBlocking {
initEndpointState()
}
} catch (e: Exception) {
logger.error("Unable to initialize the endpoint state", e)
return
@ -309,73 +310,114 @@ open class Client<CONNECTION : Connection>(
// - config.enableIpc
// - NULL remoteAddress
// It is entirely possible that the server does not have IPC enabled!
var isUsingIPC = false
val autoChangeToIpc = (config.enableIpc && (remoteAddress == null || remoteAddress.isLoopbackAddress)) || (!config.enableIpc && remoteAddress == null)
val autoChangeToIpc =
(config.enableIpc && (remoteAddress == null || remoteAddress.isLoopbackAddress)) || (!config.enableIpc && remoteAddress == null)
val handshake = ClientHandshake(crypto, this, logger)
val handshakeConnection = if (autoChangeToIpc) {
logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" }
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via network instead
val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID)
runBlocking {
val handshakeTimeout = 5
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
val startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
try {
val handshakeConnection = if (autoChangeToIpc) {
buildIpcHandshake(ipcSubscriptionId, ipcPublicationId, handshakeTimeout, reliable)
} else {
buildUdpHandshake(handshakeTimeout, reliable)
}
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try {
ipcConnection.buildClient(aeronDriver, logger)
isUsingIPC = true
} catch (e: Exception) {
if (remoteAddress == null) {
// if we specified that we MUST use IPC, then we have to throw the exception, because there is no IPC
throw ClientException("Unable to connect via IPC to server. No address was specified", e)
logger.info(handshakeConnection.clientInfo())
connect0(handshake, handshakeConnection, handshakeTimeout)
// once we're done with the connection process, stop trying
break
} catch (e: ClientException) {
handshake.reset()
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
delay(500)
if (logger.isTraceEnabled) {
logger.trace("Unable to connect, retrying", e)
} else {
logger.error("Unable to connect, retrying ${e.message}")
}
} catch (e: Exception) {
logger.error("Un-recoverable error. Aborting.", e)
throw e
}
}
}
}
if (isUsingIPC) {
ipcConnection
} else {
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" }
private suspend fun buildIpcHandshake(ipcSubscriptionId: Int, ipcPublicationId: Int, connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
logger.info {
"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC"
}
// try a UDP connection instead
val udpConnection = UdpMediaDriverClientConnection(
address = remoteAddress!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable)
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via network instead
val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID
)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.buildClient(aeronDriver, logger)
udpConnection
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try {
ipcConnection.buildClient(aeronDriver, logger)
return ipcConnection
} catch (e: Exception) {
if (remoteAddress == null) {
// if we specified that we MUST use IPC, then we have to throw the exception, because there is no IPC
throw ClientException("Unable to connect via IPC to server. No address was specified", e)
}
}
else {
val test = UdpMediaDriverClientConnection(
address = remoteAddress!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeronDriver, logger)
test
}
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" }
// try a UDP connection instead
val udpConnection = UdpMediaDriverClientConnection(
address = remoteAddress!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable
)
logger.info(handshakeConnection.clientInfo())
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.buildClient(aeronDriver, logger)
return udpConnection
}
private suspend fun buildUdpHandshake(connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
val test = UdpMediaDriverClientConnection(
address = remoteAddress!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = connectionTimeoutSec,
isReliable = reliable
)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeronDriver, logger)
return test
}
// the handshake process might have to restart this connection process.
private suspend fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) {
// this will block until the connection timeout, and throw an exception if we were unable to connect with the server
val isUsingIPC = handshakeConnection is IpcMediaDriverConnection
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
val connectionInfo = try {
handshake.handshakeHello(handshakeConnection, connectionTimeoutSec)
handshake.hello(handshakeConnection, connectionTimeoutSec)
} catch (e: Exception) {
logger.error("Handshake error", e)
throw e
@ -397,7 +439,7 @@ open class Client<CONNECTION : Connection>(
}
// VALIDATE:: If the the serialization DOES NOT match between the client/server, then the server will emit a log, and the
// VALIDATE:: If the serialization DOES NOT match between the client/server, then the server will emit a log, and the
// client will timeout. SPECIFICALLY.... we do not give class serialization/registration info to the client (in case the client
// is rogue, we do not want to carelessly provide info.
@ -436,7 +478,7 @@ open class Client<CONNECTION : Connection>(
//// RMI
///////////////
// we setup our kryo information once we connect to a server (using the server's kryo registration details)
// we set up our kryo information once we connect to a server (using the server's kryo registration details)
if (!serialization.finishInit(type, connectionInfo.kryoRegistrationDetails)) {
handshakeConnection.close()
@ -465,14 +507,14 @@ open class Client<CONNECTION : Connection>(
val permitConnection = listenerManager.notifyFilter(newConnection)
if (!permitConnection) {
handshakeConnection.close()
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress)} was not permitted!")
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!")
ListenerManager.cleanStackTrace(exception)
logger.error("Permission error", exception)
throw exception
}
logger.info("Adding new signature for ${IP.toString(remoteAddress)} : ${connectionInfo.publicKey.toHexString()}")
storage.addRegisteredServerKey(remoteAddress, connectionInfo.publicKey)
logger.info("Adding new signature for ${IP.toString(remoteAddress!!)} : ${connectionInfo.publicKey.toHexString()}")
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey)
}
@ -482,7 +524,7 @@ open class Client<CONNECTION : Connection>(
newConnection.preCloseAction = {
// this is called whenever connection.close() is called by the framework or via client.close()
// on the client, we want to GUARANTEE that the disconnect happens-before the connect.
// on the client, we want to GUARANTEE that the disconnect happens-before connect.
if (!lockStepForConnect.compareAndSet(null, SuspendWaiter())) {
logger.error("Connection ${newConnection.id}", "close lockStep for disconnect was in the wrong state!")
}
@ -496,7 +538,7 @@ open class Client<CONNECTION : Connection>(
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
actionDispatch.eventLoop {
listenerManager.notifyDisconnect(connection)
lockStepForConnect.value?.cancel()
lockStepForConnect.getAndSet(null)?.cancel()
}
}
@ -509,8 +551,10 @@ open class Client<CONNECTION : Connection>(
// also closes the handshake (will also throw connect timeout exception)
val canFinishConnecting: Boolean
runBlocking {
// this value matches the server, and allows for a more robust connection attempt
val successAttemptTimeout = config.connectionCloseTimeoutInSeconds * 2
canFinishConnecting = try {
handshake.handshakeDone(handshakeConnection, connectionTimeoutSec)
handshake.done(handshakeConnection, successAttemptTimeout)
} catch (e: ClientException) {
logger.error("Error during handshake", e)
false

View File

@ -485,7 +485,9 @@ open class Server<CONNECTION : Connection>(
}
try {
initEndpointState()
runBlocking {
initEndpointState()
}
} catch (e: Exception) {
logger.error("Unable to initialize the endpoint state", e)
return
@ -622,7 +624,7 @@ open class Server<CONNECTION : Connection>(
ipv6Poller.close()
ipcPoller.close()
// clear all of the handshake info
// clear all the handshake info
handshake.clear()
// finish closing -- this lets us make sure that we don't run into race conditions on the thread that calls close()

View File

@ -18,19 +18,20 @@ package dorkbox.network.aeron
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
import java.lang.Thread.sleep
import java.util.concurrent.*
/**
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast
* NOTE: IPC connection will ALWAYS have a timeout of 10 second to connect. This is IPC, it should connect fast
*/
internal open class IpcMediaDriverConnection(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int,
) :
MediaDriverConnection(0, 0, streamId, sessionId, 1, true) {
MediaDriverConnection(0, 0, streamId, sessionId, 10, true) {
var success: Boolean = false
@ -48,7 +49,7 @@ internal open class IpcMediaDriverConnection(streamId: Int,
*
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
@ -74,13 +75,13 @@ internal open class IpcMediaDriverConnection(streamId: Int,
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected && subscription.imageCount() > 0) {
success = true
break
}
sleep(500L)
delay(500L) // not delay? maybe coroutines?
}
@ -94,13 +95,13 @@ internal open class IpcMediaDriverConnection(streamId: Int,
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) {
success = true
break
}
sleep(500L)
delay(500L) // not delay? maybe coroutines?
}
if (!success) {

View File

@ -30,7 +30,7 @@ abstract class MediaDriverConnection(
lateinit var publication: Publication
abstract fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
abstract fun clientInfo() : String

View File

@ -21,8 +21,8 @@ import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
import java.lang.Thread.sleep
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.*
@ -79,7 +79,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
@Suppress("DuplicatedCode")
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = aeronConnectionString(address)
// Create a publication at the given address and port, using the given stream ID.
@ -112,13 +112,13 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected) {
success = true
break
}
sleep(500L)
delay(500L)
}
if (!success) {
@ -133,13 +133,13 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) {
success = true
break
}
sleep(500L)
delay(500L)
}
if (!success) {

View File

@ -29,7 +29,7 @@ internal class UdpMediaDriverPairedConnection(listenAddress: InetAddress,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutSec: Int = 0,
connectionTimeoutSec: Int,
isReliable: Boolean = true) :
UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {

View File

@ -34,7 +34,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutSec: Int = 0,
connectionTimeoutSec: Int,
isReliable: Boolean = true) :
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
@ -64,7 +64,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
}
@Suppress("DuplicatedCode")
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
throw ServerException("Client info not implemented in Server MDC")
}

View File

@ -351,12 +351,13 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
subscription.close()
val timeOut = TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
var closeTimeoutTime = System.currentTimeMillis() + timeOut
val timoutInNanos = TimeUnit.SECONDS.toNanos(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
var closeTimeoutTime = System.nanoTime()
// we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything
// we want a timeout-check, otherwise this will run forever
while (messagesInProgress.value != 0 && System.currentTimeMillis() < closeTimeoutTime) {
while (messagesInProgress.value != 0 && System.nanoTime() - closeTimeoutTime < timoutInNanos) {
delay(50)
}
@ -364,8 +365,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val logFile = endPoint.aeronDriver.getMediaDriverPublicationFile(publication.registrationId())
publication.close()
closeTimeoutTime = System.currentTimeMillis() + timeOut
while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) {
closeTimeoutTime = System.nanoTime()
while (logFile.exists() && System.nanoTime() - closeTimeoutTime < timoutInNanos) {
if (logFile.delete()) {
break
}

View File

@ -36,7 +36,6 @@ import dorkbox.network.rmi.messages.RmiMessage
import dorkbox.network.serialization.KryoExtra
import dorkbox.network.serialization.Serialization
import dorkbox.network.serialization.SettingsStore
import dorkbox.util.exceptions.SecurityException
import io.aeron.Publication
import io.aeron.driver.MediaDriver
import io.aeron.logbuffer.Header
@ -176,7 +175,7 @@ internal constructor(val type: Class<*>,
/**
* @throws Exception if there is a problem starting the media driver
*/
internal fun initEndpointState() {
internal suspend fun initEndpointState() {
shutdown.getAndSet(false)
shutdownWaiter = SuspendWaiter()

View File

@ -24,6 +24,7 @@ import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.FragmentAssembler
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import kotlinx.coroutines.delay
import mu.KLogger
import org.agrona.DirectBuffer
import java.util.concurrent.*
@ -88,7 +89,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
if (oneTimeKey != message.oneTimeKey) {
logger.error("[$message.sessionId] ignored message (one-time key: ${message.oneTimeKey}) intended for another client (mine is: ${oneTimeKey})")
logger.error("[$message] ignored message (one-time key: ${message.oneTimeKey}) intended for another client (mine is: ${oneTimeKey})")
return@FragmentAssembler
}
@ -144,7 +145,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
fun hello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
failed = false
oneTimeKey = endPoint.crypto.secureRandom.nextInt()
val publicKey = endPoint.storage.getPublicKey()!!
@ -166,7 +167,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
val startTime = System.nanoTime()
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1)
@ -194,7 +195,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
fun handshakeDone(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int): Boolean {
suspend fun done(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int): Boolean {
val registrationMessage = HandshakeMessage.doneFromClient(oneTimeKey)
// Send the done message to the server.
@ -214,7 +215,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
while (System.nanoTime() - startTime < timoutInNanos) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1)
@ -230,6 +231,8 @@ internal class ClientHandshake<CONNECTION: Connection>(
startTime = System.nanoTime()
}
delay(100)
// 0 means we idle. >0 means reset and don't idle (because there are likely more)
pollIdleStrategy.idle(pollCount)
}
@ -246,4 +249,13 @@ internal class ClientHandshake<CONNECTION: Connection>(
return connectionDone
}
fun reset() {
oneTimeKey = 0
connectionHelloInfo = null
connectionDone = false
needToRetry = false
failedMessage = ""
failed = true
}
}

View File

@ -42,7 +42,7 @@ import java.util.concurrent.*
* 'notifyConnect' must be THE ONLY THING in this class to use the action dispatch!
*/
@Suppress("DuplicatedCode")
internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLogger,
private val config: ServerConfiguration,
private val listenerManager: ListenerManager<CONNECTION>) {
@ -50,13 +50,8 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
private val pendingConnections = ExpiringMap.builder()
.expiration(config.connectionCloseTimeoutInSeconds.toLong() * 2, TimeUnit.SECONDS)
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<Int, CONNECTION> { _, connection ->
// this blocks until it fully runs (which is ok. this is fast)
logger.error("[${connection.id}] Timed out waiting for registration response from client")
runBlocking {
connection.close()
}
.expirationListener<Int, CONNECTION> { sessionId, connection ->
expirePendingConnections(sessionId, connection)
}
.build<Int, CONNECTION>()
@ -68,6 +63,16 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE)
private fun expirePendingConnections(sessionId: Int, connection: CONNECTION) {
// this blocks until it fully runs (which is ok. this is fast)
logger.error("[${connection.id}] Timed out waiting for registration response from client")
pendingConnections.remove(sessionId)
runBlocking {
connection.close()
}
}
/**
* @return true if we should continue parsing the incoming message, false if we should abort
*/
@ -427,7 +432,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
return
}
// the pub/sub do not necessarily have to be the same. The can be ANY port
// the pub/sub do not necessarily have to be the same. They can be ANY port
val publicationPort = config.publicationPort
val subscriptionPort = config.subscriptionPort
@ -507,7 +512,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
// before we notify connect, we have to wait for the client to tell us that they can receive data
pendingConnections[sessionId] = connection
// this tells the client all of the info to connect.
// this tells the client all the info to connect.
server.writeHandshakeMessage(handshakePublication, successMessage) // exception is already caught
} catch (e: Exception) {
// have to unwind actions!