diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index dca97f7f..ba58a497 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -628,9 +628,9 @@ open class Client( // because we are getting the class registration details from the SERVER, this should never be the case. // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) val exception = if (isUsingIPC) { - ClientRejectedException("Connection to IPC has incorrect class registration details!!") + ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!") } else { - ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!") + ClientRejectedException("[${handshake.connectKey}] Connection to $remoteAddressString has incorrect class registration details!!") } ListenerManager.cleanStackTraceInternal(exception) throw exception @@ -649,13 +649,13 @@ open class Client( val permitConnection = listenerManager.notifyFilter(newConnection) if (!permitConnection) { handshakeConnection.close() - val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!") + val exception = ClientRejectedException("[${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString was not permitted!") ListenerManager.cleanStackTrace(exception) logger.error(exception) { "Permission error" } throw exception } - logger.info { "Adding new signature for ${IP.toString(remoteAddress!!)} : ${connectionInfo.publicKey.toHexString()}" } + logger.info { "[${handshake.connectKey}] Connection (${newConnection.id}) : Adding new signature for ${IP.toString(remoteAddress!!)} : ${connectionInfo.publicKey.toHexString()}" } storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey) } @@ -668,7 +668,7 @@ open class Client( // on the client, we want to GUARANTEE that the disconnect happens-before connect. if (!lockStepForConnect.compareAndSet(null, Mutex(locked = true))) { - logger.error { "Connection ${newConnection.id} : close lockStep for disconnect was in the wrong state!" } + logger.error { "[${handshake.connectKey}] Connection ${newConnection.id} : close lockStep for disconnect was in the wrong state!" } } isConnected = false @@ -689,8 +689,6 @@ open class Client( connection0 = newConnection addConnection(newConnection) - logger.error { "Connection created, finishing handshake: ${handshake.connectKey}" } - // tell the server our connection handshake is done, and the connection can now listen for data. // also closes the handshake (will also throw connect timeout exception) val canFinishConnecting: Boolean @@ -708,7 +706,7 @@ open class Client( if (canFinishConnecting) { isConnected = true - + logger.debug { "[${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString done with handshake." } // this forces the current thread to WAIT until poll system has started val mutex = Mutex(locked = true) diff --git a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt index cad2a945..e61fe3b5 100644 --- a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt @@ -70,11 +70,16 @@ internal open class IpcMediaDriverConnection(streamId: Int, val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription) - // this will wait for the server to acknowledge the connection (all via aeron) + + // We must add the subscription first, because we must be available to listen when the server responds. + + val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) var startTime = System.nanoTime() + + // this will wait for the server to acknowledge the connection (all via aeron) while (System.nanoTime() - startTime < timoutInNanos) { - if (subscription.isConnected && subscription.imageCount() > 0) { + if (subscription.isConnected) { success = true break } @@ -82,20 +87,20 @@ internal open class IpcMediaDriverConnection(streamId: Int, delay(500L) } - if (!success) { subscription.close() - val clientTimedOutException = ClientTimedOutException("Creating subscription connection to aeron") - ListenerManager.cleanStackTraceInternal(clientTimedOutException) + val clientTimedOutException = ClientTimedOutException("Cannot create subscription IPC connection to server") + ListenerManager.cleanAllStackTrace(clientTimedOutException) throw clientTimedOutException } - success = false - // this will wait for the server to acknowledge the connection (all via aeron) + + success = false startTime = System.nanoTime() + while (System.nanoTime() - startTime < timoutInNanos) { if (publication.isConnected) { success = true @@ -109,8 +114,8 @@ internal open class IpcMediaDriverConnection(streamId: Int, subscription.close() publication.close() - val clientTimedOutException = ClientTimedOutException("Creating publication connection to aeron") - ListenerManager.cleanStackTraceInternal(clientTimedOutException) + val clientTimedOutException = ClientTimedOutException("Cannot create publication IPC connection to server") + ListenerManager.cleanAllStackTrace(clientTimedOutException) throw clientTimedOutException } diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index 500f3da5..0229ec2e 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -38,8 +38,8 @@ abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionP override fun close() { if (success) { - publication.close() subscription.close() + publication.close() } } } diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index 2f7aebe7..19b17779 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -21,8 +21,6 @@ import dorkbox.network.connection.ListenerManager import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.ChannelUriStringBuilder -import io.aeron.Publication -import io.aeron.Subscription import kotlinx.coroutines.delay import mu.KLogger import java.net.Inet4Address @@ -42,12 +40,6 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, isReliable: Boolean = true) : MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { - @Volatile - private var tempSubscription: Subscription? = null - - @Volatile - private var tempPublication: Publication? = null - val addressString: String by lazy { IP.toString(address) } @@ -61,7 +53,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, return builder } - val ip: String by lazy { + val ipType: String by lazy { if (address is Inet4Address) { "IPv4" } else { @@ -92,35 +84,31 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - if (tempPublication == null) { - // Create a publication at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - .endpoint("$aeronAddressString:$publicationPort") + // Create a publication at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + .endpoint("$aeronAddressString:$publicationPort") - logger.trace("client pub URI: $ip ${publicationUri.build()}") - - tempPublication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) - } - - if (tempSubscription == null) { - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - .controlEndpoint("$aeronAddressString:$subscriptionPort") - .controlMode("dynamic") - - logger.trace("client sub URI: $ip ${subscriptionUri.build()}") - - tempSubscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) - } - - val publication = tempPublication!! - val subscription = tempSubscription!! + logger.trace("client pub URI: $ipType ${publicationUri.build()}") + val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + + + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + .controlEndpoint("$aeronAddressString:$subscriptionPort") + .controlMode("dynamic") + + logger.trace("client sub URI: $ipType ${subscriptionUri.build()}") + val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) + + + // We must add the subscription first, because we must be available to listen when the server responds. - // this will wait for the server to acknowledge the connection (all via aeron) val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) var startTime = System.nanoTime() + + // this will wait for the server to acknowledge the connection (all via aeron) while (System.nanoTime() - startTime < timoutInNanos) { if (subscription.isConnected) { success = true @@ -133,16 +121,16 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, if (!success) { subscription.close() - val ex = ClientTimedOutException("Cannot create subscription to $ip in $connectionTimeoutSec seconds") + val ex = ClientTimedOutException("Cannot create subscription to $ipType $addressString in $connectionTimeoutSec seconds") ListenerManager.cleanAllStackTrace(ex) throw ex } - success = false - // this will wait for the server to acknowledge the connection (all via aeron) + success = false startTime = System.nanoTime() + while (System.nanoTime() - startTime < timoutInNanos) { if (publication.isConnected) { success = true @@ -156,17 +144,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, subscription.close() publication.close() - val ex = ClientTimedOutException("Cannot create publication to $ip in $connectionTimeoutSec seconds") + val ex = ClientTimedOutException("Cannot create publication to $ipType $addressString in $connectionTimeoutSec seconds") ListenerManager.cleanAllStackTrace(ex) throw ex + } + + this.success = true this.publication = publication this.subscription = subscription - - this.tempPublication = null - this.tempSubscription = null } override val clientInfo: String by lazy { diff --git a/src/dorkbox/network/handshake/ClientHandshake.kt b/src/dorkbox/network/handshake/ClientHandshake.kt index 5c500327..cc40d32b 100644 --- a/src/dorkbox/network/handshake/ClientHandshake.kt +++ b/src/dorkbox/network/handshake/ClientHandshake.kt @@ -278,8 +278,6 @@ internal class ClientHandshake( throw exception } - logger.error{"[${subscription.streamId()}] handshake done"} - return connectionDone } diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 21222233..f6cf7fe3 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -104,7 +104,7 @@ internal class ServerHandshake(private val logger: KLog if (pendingConnection == null) { logger.error { "[${message.connectKey}] Error! Pending connection from client $connectionString was null, and cannot complete handshake!" } } else { - logger.trace { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." } + logger.debug { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." } pendingConnection.closeAction = { // called on connection.close() @@ -356,9 +356,7 @@ internal class ServerHandshake(private val logger: KLog // Manage the Handshake state - if (!validateMessageTypeAndDoPending( - server, server.actionDispatch, handshakePublication, message, clientAddressString, logger - )) { + if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, clientAddressString, logger)) { return } @@ -505,6 +503,8 @@ internal class ServerHandshake(private val logger: KLog // before we notify connect, we have to wait for the client to tell us that they can receive data pendingConnections[message.connectKey] = connection + logger.debug { "[${message.connectKey}] Connection (${connection.id}) responding to handshake hello." } + // this tells the client all the info to connect. server.writeHandshakeMessage(handshakePublication, successMessage) // exception is already caught } catch (e: Exception) { @@ -513,7 +513,7 @@ internal class ServerHandshake(private val logger: KLog sessionIdAllocator.free(connectionSessionId) streamIdAllocator.free(connectionStreamId) - logger.error(e) { "Connection handshake from $clientAddressString crashed! Message $message" } + logger.error(e) { "[${message.connectKey}] Connection (${connection?.id}) handshake from $clientAddressString crashed! Message $message" } } }