Formatting and logging cleanup

This commit is contained in:
Robinson 2022-06-27 00:11:14 +02:00
parent 4a0357d92b
commit 3e092ffd54
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
6 changed files with 54 additions and 65 deletions

View File

@ -628,9 +628,9 @@ open class Client<CONNECTION : Connection>(
// because we are getting the class registration details from the SERVER, this should never be the case. // because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
val exception = if (isUsingIPC) { val exception = if (isUsingIPC) {
ClientRejectedException("Connection to IPC has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!")
} else { } else {
ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to $remoteAddressString has incorrect class registration details!!")
} }
ListenerManager.cleanStackTraceInternal(exception) ListenerManager.cleanStackTraceInternal(exception)
throw exception throw exception
@ -649,13 +649,13 @@ open class Client<CONNECTION : Connection>(
val permitConnection = listenerManager.notifyFilter(newConnection) val permitConnection = listenerManager.notifyFilter(newConnection)
if (!permitConnection) { if (!permitConnection) {
handshakeConnection.close() handshakeConnection.close()
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!") val exception = ClientRejectedException("[${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString was not permitted!")
ListenerManager.cleanStackTrace(exception) ListenerManager.cleanStackTrace(exception)
logger.error(exception) { "Permission error" } logger.error(exception) { "Permission error" }
throw exception throw exception
} }
logger.info { "Adding new signature for ${IP.toString(remoteAddress!!)} : ${connectionInfo.publicKey.toHexString()}" } logger.info { "[${handshake.connectKey}] Connection (${newConnection.id}) : Adding new signature for ${IP.toString(remoteAddress!!)} : ${connectionInfo.publicKey.toHexString()}" }
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey) storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey)
} }
@ -668,7 +668,7 @@ open class Client<CONNECTION : Connection>(
// on the client, we want to GUARANTEE that the disconnect happens-before connect. // on the client, we want to GUARANTEE that the disconnect happens-before connect.
if (!lockStepForConnect.compareAndSet(null, Mutex(locked = true))) { if (!lockStepForConnect.compareAndSet(null, Mutex(locked = true))) {
logger.error { "Connection ${newConnection.id} : close lockStep for disconnect was in the wrong state!" } logger.error { "[${handshake.connectKey}] Connection ${newConnection.id} : close lockStep for disconnect was in the wrong state!" }
} }
isConnected = false isConnected = false
@ -689,8 +689,6 @@ open class Client<CONNECTION : Connection>(
connection0 = newConnection connection0 = newConnection
addConnection(newConnection) addConnection(newConnection)
logger.error { "Connection created, finishing handshake: ${handshake.connectKey}" }
// tell the server our connection handshake is done, and the connection can now listen for data. // tell the server our connection handshake is done, and the connection can now listen for data.
// also closes the handshake (will also throw connect timeout exception) // also closes the handshake (will also throw connect timeout exception)
val canFinishConnecting: Boolean val canFinishConnecting: Boolean
@ -708,7 +706,7 @@ open class Client<CONNECTION : Connection>(
if (canFinishConnecting) { if (canFinishConnecting) {
isConnected = true isConnected = true
logger.debug { "[${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString done with handshake." }
// this forces the current thread to WAIT until poll system has started // this forces the current thread to WAIT until poll system has started
val mutex = Mutex(locked = true) val mutex = Mutex(locked = true)

View File

@ -70,11 +70,16 @@ internal open class IpcMediaDriverConnection(streamId: Int,
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription) val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription)
// this will wait for the server to acknowledge the connection (all via aeron)
// We must add the subscription first, because we must be available to listen when the server responds.
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime() var startTime = System.nanoTime()
// this will wait for the server to acknowledge the connection (all via aeron)
while (System.nanoTime() - startTime < timoutInNanos) { while (System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected && subscription.imageCount() > 0) { if (subscription.isConnected) {
success = true success = true
break break
} }
@ -82,20 +87,20 @@ internal open class IpcMediaDriverConnection(streamId: Int,
delay(500L) delay(500L)
} }
if (!success) { if (!success) {
subscription.close() subscription.close()
val clientTimedOutException = ClientTimedOutException("Creating subscription connection to aeron") val clientTimedOutException = ClientTimedOutException("Cannot create subscription IPC connection to server")
ListenerManager.cleanStackTraceInternal(clientTimedOutException) ListenerManager.cleanAllStackTrace(clientTimedOutException)
throw clientTimedOutException throw clientTimedOutException
} }
success = false
// this will wait for the server to acknowledge the connection (all via aeron)
success = false
startTime = System.nanoTime() startTime = System.nanoTime()
while (System.nanoTime() - startTime < timoutInNanos) { while (System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) { if (publication.isConnected) {
success = true success = true
@ -109,8 +114,8 @@ internal open class IpcMediaDriverConnection(streamId: Int,
subscription.close() subscription.close()
publication.close() publication.close()
val clientTimedOutException = ClientTimedOutException("Creating publication connection to aeron") val clientTimedOutException = ClientTimedOutException("Cannot create publication IPC connection to server")
ListenerManager.cleanStackTraceInternal(clientTimedOutException) ListenerManager.cleanAllStackTrace(clientTimedOutException)
throw clientTimedOutException throw clientTimedOutException
} }

View File

@ -38,8 +38,8 @@ abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionP
override fun close() { override fun close() {
if (success) { if (success) {
publication.close()
subscription.close() subscription.close()
publication.close()
} }
} }
} }

View File

@ -21,8 +21,6 @@ import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder import io.aeron.ChannelUriStringBuilder
import io.aeron.Publication
import io.aeron.Subscription
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import mu.KLogger import mu.KLogger
import java.net.Inet4Address import java.net.Inet4Address
@ -42,12 +40,6 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
isReliable: Boolean = true) : isReliable: Boolean = true) :
MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
@Volatile
private var tempSubscription: Subscription? = null
@Volatile
private var tempPublication: Publication? = null
val addressString: String by lazy { val addressString: String by lazy {
IP.toString(address) IP.toString(address)
} }
@ -61,7 +53,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
return builder return builder
} }
val ip: String by lazy { val ipType: String by lazy {
if (address is Inet4Address) { if (address is Inet4Address) {
"IPv4" "IPv4"
} else { } else {
@ -92,35 +84,31 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// on close, the publication CAN linger (in case a client goes away, and then comes back) // on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
if (tempPublication == null) {
// Create a publication at the given address and port, using the given stream ID. // Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri() val publicationUri = uri()
.endpoint("$aeronAddressString:$publicationPort") .endpoint("$aeronAddressString:$publicationPort")
logger.trace("client pub URI: $ip ${publicationUri.build()}") logger.trace("client pub URI: $ipType ${publicationUri.build()}")
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
tempPublication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
}
if (tempSubscription == null) {
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri() val subscriptionUri = uri()
.controlEndpoint("$aeronAddressString:$subscriptionPort") .controlEndpoint("$aeronAddressString:$subscriptionPort")
.controlMode("dynamic") .controlMode("dynamic")
logger.trace("client sub URI: $ip ${subscriptionUri.build()}") logger.trace("client sub URI: $ipType ${subscriptionUri.build()}")
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
tempSubscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
}
// We must add the subscription first, because we must be available to listen when the server responds.
val publication = tempPublication!!
val subscription = tempSubscription!!
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var startTime = System.nanoTime() var startTime = System.nanoTime()
// this will wait for the server to acknowledge the connection (all via aeron)
while (System.nanoTime() - startTime < timoutInNanos) { while (System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected) { if (subscription.isConnected) {
success = true success = true
@ -133,16 +121,16 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
if (!success) { if (!success) {
subscription.close() subscription.close()
val ex = ClientTimedOutException("Cannot create subscription to $ip in $connectionTimeoutSec seconds") val ex = ClientTimedOutException("Cannot create subscription to $ipType $addressString in $connectionTimeoutSec seconds")
ListenerManager.cleanAllStackTrace(ex) ListenerManager.cleanAllStackTrace(ex)
throw ex throw ex
} }
success = false
// this will wait for the server to acknowledge the connection (all via aeron) success = false
startTime = System.nanoTime() startTime = System.nanoTime()
while (System.nanoTime() - startTime < timoutInNanos) { while (System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) { if (publication.isConnected) {
success = true success = true
@ -156,17 +144,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
subscription.close() subscription.close()
publication.close() publication.close()
val ex = ClientTimedOutException("Cannot create publication to $ip in $connectionTimeoutSec seconds") val ex = ClientTimedOutException("Cannot create publication to $ipType $addressString in $connectionTimeoutSec seconds")
ListenerManager.cleanAllStackTrace(ex) ListenerManager.cleanAllStackTrace(ex)
throw ex throw ex
} }
this.success = true this.success = true
this.publication = publication this.publication = publication
this.subscription = subscription this.subscription = subscription
this.tempPublication = null
this.tempSubscription = null
} }
override val clientInfo: String by lazy { override val clientInfo: String by lazy {

View File

@ -278,8 +278,6 @@ internal class ClientHandshake<CONNECTION: Connection>(
throw exception throw exception
} }
logger.error{"[${subscription.streamId()}] handshake done"}
return connectionDone return connectionDone
} }

View File

@ -104,7 +104,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
if (pendingConnection == null) { if (pendingConnection == null) {
logger.error { "[${message.connectKey}] Error! Pending connection from client $connectionString was null, and cannot complete handshake!" } logger.error { "[${message.connectKey}] Error! Pending connection from client $connectionString was null, and cannot complete handshake!" }
} else { } else {
logger.trace { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." } logger.debug { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." }
pendingConnection.closeAction = { pendingConnection.closeAction = {
// called on connection.close() // called on connection.close()
@ -356,9 +356,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// Manage the Handshake state // Manage the Handshake state
if (!validateMessageTypeAndDoPending( if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, clientAddressString, logger)) {
server, server.actionDispatch, handshakePublication, message, clientAddressString, logger
)) {
return return
} }
@ -505,6 +503,8 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// before we notify connect, we have to wait for the client to tell us that they can receive data // before we notify connect, we have to wait for the client to tell us that they can receive data
pendingConnections[message.connectKey] = connection pendingConnections[message.connectKey] = connection
logger.debug { "[${message.connectKey}] Connection (${connection.id}) responding to handshake hello." }
// this tells the client all the info to connect. // this tells the client all the info to connect.
server.writeHandshakeMessage(handshakePublication, successMessage) // exception is already caught server.writeHandshakeMessage(handshakePublication, successMessage) // exception is already caught
} catch (e: Exception) { } catch (e: Exception) {
@ -513,7 +513,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
sessionIdAllocator.free(connectionSessionId) sessionIdAllocator.free(connectionSessionId)
streamIdAllocator.free(connectionStreamId) streamIdAllocator.free(connectionStreamId)
logger.error(e) { "Connection handshake from $clientAddressString crashed! Message $message" } logger.error(e) { "[${message.connectKey}] Connection (${connection?.id}) handshake from $clientAddressString crashed! Message $message" }
} }
} }