diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index 704d952a..e878bd93 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -17,6 +17,7 @@ package dorkbox.network.aeron import dorkbox.netUtil.IP +import dorkbox.network.connection.ListenerManager import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.ChannelUriStringBuilder @@ -68,7 +69,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, return builder } + val ip: String by lazy { + if (address is Inet4Address) { + "IPv4" + } else { + "IPv6" + } + } + + @Suppress("DuplicatedCode") + @Throws(ClientException::class) override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { val aeronAddressString = aeronConnectionString(address) @@ -85,13 +96,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, if (logger.isTraceEnabled) { - if (address is Inet4Address) { - logger.trace("IPV4 client pub URI: ${publicationUri.build()}") - logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}") - } else { - logger.trace("IPV6 client pub URI: ${publicationUri.build()}") - logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}") - } + logger.trace("client pub URI: $ip ${publicationUri.build()}") + logger.trace("client sub URI: $ip ${subscriptionUri.build()}") } // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe @@ -117,7 +123,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, if (!success) { subscription.close() - throw ClientTimedOutException("Cannot create subscription!") + val ex = ClientTimedOutException("Cannot create subscription: $ip ${subscriptionUri.build()}") + ListenerManager.cleanStackTrace(ex) + throw ex } @@ -137,7 +145,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, if (!success) { subscription.close() publication.close() - throw ClientTimedOutException("Creating publication connection to aeron") + val ex = ClientTimedOutException("Cannot create publication: $ip ${publicationUri.build()}") +// ListenerManager.cleanStackTrace(ex) + throw ex } this.success = true @@ -147,12 +157,10 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } override fun clientInfo(): String { - address - return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { - "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + "Connecting to $addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" } else { - "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" + "Connecting handshake to $addressString [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" } } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index b0093797..478da024 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -144,10 +144,17 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A return config.settingsStore.create(logger) } - internal fun initEndpointState() { shutdown.getAndSet(false) shutdownWaiter = SuspendWaiter() + + // Only starts the media driver if we are NOT already running! + try { + aeronDriver.start() + } catch (e: Exception) { + listenerManager.notifyError(e) + throw e + } } abstract fun newException(message: String, cause: Throwable? = null): Throwable @@ -158,7 +165,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A removeConnection(connection as CONNECTION) } - /** * Adds a custom connection to the server. * @@ -347,7 +353,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } } catch (e: Exception) { val exception = newException("[${publication.sessionId()}] Error serializing handshake message $message", e) - ListenerManager.cleanStackTrace(exception) + ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException` listenerManager.notifyError(exception) } finally { sendIdleStrategy.reset() @@ -377,7 +383,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A val sessionId = header.sessionId() val exception = newException("[${sessionId}] Error de-serializing message", e) - ListenerManager.cleanStackTrace(exception) + ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException` listenerManager.notifyError(exception) logger.error("Error de-serializing message on connection ${header.sessionId()}!", e) @@ -410,7 +416,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A val sessionId = header.sessionId() val exception = newException("[${sessionId}] Error de-serializing message", e) - ListenerManager.cleanStackTrace(exception) + ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException` listenerManager.notifyError(connection, exception) return // don't do anything! @@ -507,9 +513,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // more critical error sending the message. we shouldn't retry or anything. val errorMessage = "[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})" - // either client or server=. No other choices. We create an exception, because it's more useful! + // either client or server. No other choices. We create an exception, because it's more useful! val exception = newException(errorMessage) - ListenerManager.cleanStackTrace(exception, 1) + + // 2 because we do not want to see the stack for the abstract `newException` + // 2 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 4) @Suppress("UNCHECKED_CAST") listenerManager.notifyError(connection as CONNECTION, exception) diff --git a/src/dorkbox/network/connection/ListenerManager.kt b/src/dorkbox/network/connection/ListenerManager.kt index 9b5cf015..e8ff9d93 100644 --- a/src/dorkbox/network/connection/ListenerManager.kt +++ b/src/dorkbox/network/connection/ListenerManager.kt @@ -51,7 +51,7 @@ internal class ListenerManager { * * Neither of these are useful in resolving exception handling from a users perspective, and only clutter the stacktrace. */ - fun cleanStackTrace(throwable: Throwable) { + fun cleanStackTrace(throwable: Throwable, adjustedStartOfStack: Int = 0) { // we never care about coroutine stacks, so filter then to start with val stackTrace = throwable.stackTrace.filterNot { val stackName = it.className @@ -63,11 +63,12 @@ internal class ListenerManager { var newEndIndex = stackTrace.size - 1 // maybe offset by 1 because we have to adjust coroutine calls - var newStartIndex = 0 + var newStartIndex = adjustedStartOfStack - val savedFirstStack = if (stackTrace[0].methodName == "invokeSuspend") { - newStartIndex = 1 - stackTrace.copyOfRange(0, 1) + // sometimes we want to see the VERY first invocation, but not always + val savedFirstStack = if (stackTrace[newStartIndex].methodName == "invokeSuspend") { + newStartIndex++ + stackTrace.copyOfRange(adjustedStartOfStack, newStartIndex) } else { null } @@ -84,7 +85,7 @@ internal class ListenerManager { if (newEndIndex > 0) { if (savedFirstStack != null) { // we want to save the FIRST stack frame also, maybe - throwable.stackTrace = stackTrace.copyOfRange(0, 1) + stackTrace.copyOfRange(newStartIndex, newEndIndex) + throwable.stackTrace = savedFirstStack + stackTrace.copyOfRange(newStartIndex, newEndIndex) } else { throwable.stackTrace = stackTrace.copyOfRange(newStartIndex, newEndIndex) }