cleaned up stacktrace, code polish

This commit is contained in:
Robinson 2021-04-29 01:46:37 +02:00
parent 9c7fa4de8d
commit 37fbfe6ac1
3 changed files with 45 additions and 26 deletions

View File

@ -17,6 +17,7 @@
package dorkbox.network.aeron package dorkbox.network.aeron
import dorkbox.netUtil.IP import dorkbox.netUtil.IP
import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder import io.aeron.ChannelUriStringBuilder
@ -68,7 +69,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
return builder return builder
} }
val ip: String by lazy {
if (address is Inet4Address) {
"IPv4"
} else {
"IPv6"
}
}
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
@Throws(ClientException::class)
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = aeronConnectionString(address) val aeronAddressString = aeronConnectionString(address)
@ -85,13 +96,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
if (logger.isTraceEnabled) { if (logger.isTraceEnabled) {
if (address is Inet4Address) { logger.trace("client pub URI: $ip ${publicationUri.build()}")
logger.trace("IPV4 client pub URI: ${publicationUri.build()}") logger.trace("client sub URI: $ip ${subscriptionUri.build()}")
logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}")
} else {
logger.trace("IPV6 client pub URI: ${publicationUri.build()}")
logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}")
}
} }
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
@ -117,7 +123,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
if (!success) { if (!success) {
subscription.close() subscription.close()
throw ClientTimedOutException("Cannot create subscription!") val ex = ClientTimedOutException("Cannot create subscription: $ip ${subscriptionUri.build()}")
ListenerManager.cleanStackTrace(ex)
throw ex
} }
@ -137,7 +145,9 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
if (!success) { if (!success) {
subscription.close() subscription.close()
publication.close() publication.close()
throw ClientTimedOutException("Creating publication connection to aeron") val ex = ClientTimedOutException("Cannot create publication: $ip ${publicationUri.build()}")
// ListenerManager.cleanStackTrace(ex)
throw ex
} }
this.success = true this.success = true
@ -147,12 +157,10 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
} }
override fun clientInfo(): String { override fun clientInfo(): String {
address
return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" "Connecting to $addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else { } else {
"Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" "Connecting handshake to $addressString [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
} }
} }

View File

@ -144,10 +144,17 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
return config.settingsStore.create(logger) return config.settingsStore.create(logger)
} }
internal fun initEndpointState() { internal fun initEndpointState() {
shutdown.getAndSet(false) shutdown.getAndSet(false)
shutdownWaiter = SuspendWaiter() shutdownWaiter = SuspendWaiter()
// Only starts the media driver if we are NOT already running!
try {
aeronDriver.start()
} catch (e: Exception) {
listenerManager.notifyError(e)
throw e
}
} }
abstract fun newException(message: String, cause: Throwable? = null): Throwable abstract fun newException(message: String, cause: Throwable? = null): Throwable
@ -158,7 +165,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
removeConnection(connection as CONNECTION) removeConnection(connection as CONNECTION)
} }
/** /**
* Adds a custom connection to the server. * Adds a custom connection to the server.
* *
@ -347,7 +353,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
} }
} catch (e: Exception) { } catch (e: Exception) {
val exception = newException("[${publication.sessionId()}] Error serializing handshake message $message", e) val exception = newException("[${publication.sessionId()}] Error serializing handshake message $message", e)
ListenerManager.cleanStackTrace(exception) ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException`
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
} finally { } finally {
sendIdleStrategy.reset() sendIdleStrategy.reset()
@ -377,7 +383,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
val sessionId = header.sessionId() val sessionId = header.sessionId()
val exception = newException("[${sessionId}] Error de-serializing message", e) val exception = newException("[${sessionId}] Error de-serializing message", e)
ListenerManager.cleanStackTrace(exception) ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException`
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
logger.error("Error de-serializing message on connection ${header.sessionId()}!", e) logger.error("Error de-serializing message on connection ${header.sessionId()}!", e)
@ -410,7 +416,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
val sessionId = header.sessionId() val sessionId = header.sessionId()
val exception = newException("[${sessionId}] Error de-serializing message", e) val exception = newException("[${sessionId}] Error de-serializing message", e)
ListenerManager.cleanStackTrace(exception) ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException`
listenerManager.notifyError(connection, exception) listenerManager.notifyError(connection, exception)
return // don't do anything! return // don't do anything!
@ -507,9 +513,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})" val errorMessage = "[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})"
// either client or server=. No other choices. We create an exception, because it's more useful! // either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage) val exception = newException(errorMessage)
ListenerManager.cleanStackTrace(exception, 1)
// 2 because we do not want to see the stack for the abstract `newException`
// 2 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 4)
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
listenerManager.notifyError(connection as CONNECTION, exception) listenerManager.notifyError(connection as CONNECTION, exception)

View File

@ -51,7 +51,7 @@ internal class ListenerManager<CONNECTION: Connection> {
* *
* Neither of these are useful in resolving exception handling from a users perspective, and only clutter the stacktrace. * Neither of these are useful in resolving exception handling from a users perspective, and only clutter the stacktrace.
*/ */
fun cleanStackTrace(throwable: Throwable) { fun cleanStackTrace(throwable: Throwable, adjustedStartOfStack: Int = 0) {
// we never care about coroutine stacks, so filter then to start with // we never care about coroutine stacks, so filter then to start with
val stackTrace = throwable.stackTrace.filterNot { val stackTrace = throwable.stackTrace.filterNot {
val stackName = it.className val stackName = it.className
@ -63,11 +63,12 @@ internal class ListenerManager<CONNECTION: Connection> {
var newEndIndex = stackTrace.size - 1 var newEndIndex = stackTrace.size - 1
// maybe offset by 1 because we have to adjust coroutine calls // maybe offset by 1 because we have to adjust coroutine calls
var newStartIndex = 0 var newStartIndex = adjustedStartOfStack
val savedFirstStack = if (stackTrace[0].methodName == "invokeSuspend") { // sometimes we want to see the VERY first invocation, but not always
newStartIndex = 1 val savedFirstStack = if (stackTrace[newStartIndex].methodName == "invokeSuspend") {
stackTrace.copyOfRange(0, 1) newStartIndex++
stackTrace.copyOfRange(adjustedStartOfStack, newStartIndex)
} else { } else {
null null
} }
@ -84,7 +85,7 @@ internal class ListenerManager<CONNECTION: Connection> {
if (newEndIndex > 0) { if (newEndIndex > 0) {
if (savedFirstStack != null) { if (savedFirstStack != null) {
// we want to save the FIRST stack frame also, maybe // we want to save the FIRST stack frame also, maybe
throwable.stackTrace = stackTrace.copyOfRange(0, 1) + stackTrace.copyOfRange(newStartIndex, newEndIndex) throwable.stackTrace = savedFirstStack + stackTrace.copyOfRange(newStartIndex, newEndIndex)
} else { } else {
throwable.stackTrace = stackTrace.copyOfRange(newStartIndex, newEndIndex) throwable.stackTrace = stackTrace.copyOfRange(newStartIndex, newEndIndex)
} }