diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index d7638b94..55b3c5aa 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -521,10 +521,16 @@ open class Client( // once we're done with the connection process, stop trying break } catch (e: ClientRetryException) { - if (logger.isTraceEnabled) { - logger.trace(e) { "Unable to connect to $type, retrying..." } + val message = if (isIPC) { + "Unable to connect to IPC $ipcId, retrying..." } else { - logger.info { "Unable to connect to $type, retrying..." } + "Unable to connect to UDP $remoteAddressPrettyString, retrying..." + } + + if (logger.isTraceEnabled) { + logger.trace(e) { message } + } else { + logger.info { message } } handshake.reset() @@ -548,7 +554,7 @@ open class Client( throw e } } catch (e: Exception) { - logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake with $type. Aborting." } + logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting." } aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it @@ -559,6 +565,11 @@ open class Client( if (!success) { if (System.nanoTime() - startTime < timoutInNanos) { + val type = if (isIPC) { + "IPC $ipcId" + } else { + remoteAddressPrettyString + ":" + config.port + } // we timed out. Throw the appropriate exception val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds") logger.error(exception) { "Aborting connection attempt to server." } diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 3048f808..7d08fa2f 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -293,7 +293,7 @@ open class Server( pollCount += connection.poll() } else { // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. - logger.debug { "[${connection.details}] connection expired" } + logger.debug { "[${connection.details}] connection expired (cleanup)" } // the connection MUST be removed in the same thread that is processing events removeConnection(connection) diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index 1a5295b7..15433e6d 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -423,6 +423,9 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger } } + override fun toString(): String { + return internal.toString() + } /** * A safer way to try to close the media driver diff --git a/src/dorkbox/network/aeron/AeronDriverInternal.kt b/src/dorkbox/network/aeron/AeronDriverInternal.kt index 08d47902..f8bb667c 100644 --- a/src/dorkbox/network/aeron/AeronDriverInternal.kt +++ b/src/dorkbox/network/aeron/AeronDriverInternal.kt @@ -88,6 +88,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } } + val driverId = config.id + private val endPointUsages = mutableListOf>() @Volatile @@ -166,13 +168,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C * @return true if we are successfully connected to the aeron client */ suspend fun start(logger: KLogger): Boolean = stateMutex.withLock { - require(!closed) { "Cannot start a driver that was closed. A new driver + context must be created" } - - val driverId = config.id + require(!closed) { "Aeron Driver [$driverId]: Cannot start a driver that was closed. A new driver + context must be created" } val isLoaded = mediaDriver != null && aeron != null && aeron?.isClosed == false if (isLoaded) { - logger.debug { "Aeron Media driver [$driverId] already running... Not starting again." } + logger.debug { "Aeron Driver [$driverId]: Already running... Not starting again." } return true } @@ -184,7 +184,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // wait for a bit, because we are running, but we ALSO issued a START, and expect it to start. // SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to // that instance - logger.debug { "Aeron Media driver [$driverId] already running. Double checking status..." } + logger.debug { "Aeron Driver [$driverId]: Already running. Double checking status..." } delay(context.driverTimeout / 2) running = isRunning() } @@ -195,20 +195,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C while (count-- > 0) { try { mediaDriver = MediaDriver.launch(context.context) - logger.debug { "Successfully started the Aeron Media driver [$driverId]" } + logger.debug { "Aeron Driver [$driverId]: Successfully started" } break } catch (e: Exception) { - logger.warn(e) { "Unable to start the Aeron Media driver [$driverId] at ${context.directory}. Retrying $count more times..." } + logger.warn(e) { "Aeron Driver [$driverId]: Unable to start at ${context.directory}. Retrying $count more times..." } delay(context.driverTimeout) } } } else { - logger.debug { "Not starting Aeron Media driver [$driverId]. It was already running." } + logger.debug { "Aeron Driver [$driverId]: Not starting. It was already running." } } // if we were unable to load the aeron driver, don't continue. if (!running && mediaDriver == null) { - logger.error { "Not running and unable to start the Aeron Media driver [$driverId] at ${context.directory}." } + logger.error { "Aeron Driver [$driverId]: Not running and unable to start at ${context.directory}." } return false } } @@ -234,7 +234,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // this might succeed if we can connect to the media driver aeron = Aeron.connect(aeronDriverContext) - logger.debug { "Connected with the Aeron driver [$driverId] at '${context.directory}'" } + logger.debug { "Aeron Driver [$driverId]: Connected to '${context.directory}'" } return true } @@ -242,7 +242,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication { val uri = publicationUri.build() - logger.trace { "$aeronLogInfo pub URI: $uri,stream-id=$streamId" } // reasons we cannot add a pub/sub to aeron // 1) the driver was closed @@ -258,7 +257,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C val aeron1 = aeron if (aeron1 == null || aeron1.isClosed) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a publication to aeron") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication to aeron") ex.cleanAllStackTrace() throw ex } @@ -268,14 +267,14 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } catch (e: Exception) { // this happens if the aeron media driver cannot actually establish connection... OR IF IT IS TOO FAST BETWEEN ADD AND REMOVE FOR THE SAME SESSION/STREAM ID! e.cleanAllStackTrace() - val ex = ClientRetryException("Error adding a publication", e) + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication", e) ex.cleanAllStackTrace() throw ex } if (publication == null) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a publication") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication") ex.cleanAllStackTrace() throw ex } @@ -306,7 +305,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C val aeron1 = aeron if (aeron1 == null || aeron1.isClosed) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a publication to aeron") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication to aeron") ex.cleanAllStackTrace() throw ex } @@ -334,7 +333,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C if (publication == null) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a publication") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication") ex.cleanAllStackTrace() throw ex } @@ -366,7 +365,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C val aeron1 = aeron if (aeron1 == null || aeron1.isClosed) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a subscription to aeron") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription to aeron") ex.cleanAllStackTrace() throw ex } @@ -375,14 +374,14 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C aeron1.addSubscription(uri, streamId) } catch (e: Exception) { e.cleanAllStackTrace() - val ex = ClientRetryException("Error adding a subscription", e) + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription", e) ex.cleanAllStackTrace() throw ex } if (subscription == null) { // there was an error connecting to the aeron client or media driver. - val ex = ClientRetryException("Error adding a subscription") + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription") ex.cleanAllStackTrace() throw ex } @@ -403,7 +402,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // This can throw exceptions! publication.close() } catch (e: Exception) { - logger.error(e) { "Unable to close [$aeronLogInfo] publication $publication" } + logger.error(e) { "Aeron Driver [$driverId]: Unable to close [$aeronLogInfo] publication $publication" } } ensureLogfileDeleted("publication", getMediaDriverFile(publication), aeronLogInfo, logger) @@ -413,7 +412,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C * Guarantee that the publication is closed AND the backing file is removed */ suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String, logger: KLogger) { - logger.trace { "Closing subscription [$aeronLogInfo] ::regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" } + logger.trace { "Aeron Driver [$driverId]: Closing subscription [$aeronLogInfo] ::regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" } try { // This can throw exceptions! @@ -443,7 +442,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } if (logFile.exists()) { - logger.error("[$aeronLogInfo] Unable to delete aeron $type log on close: $logFile") + logger.error("Aeron Driver [$driverId]: [$aeronLogInfo] Unable to delete aeron $type log on close: $logFile") } pubSubs.decrementAndGet() @@ -467,7 +466,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // only emit the log info once. It's rather spammy otherwise! if (!didLog) { didLog = true - logger.debug("Aeron was still running. Waiting for it to stop...") + logger.debug("Aeron Driver [$driverId]: Still running. Waiting for it to stop...") } delay(intervalTimeoutMS) } @@ -488,7 +487,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C suspend fun isInUse(logger: KLogger): Boolean { // as many "sort-cuts" as we can for checking if the current Aeron Driver/client is still in use if (!isRunning()) { - logger.trace { "is not running" } + logger.trace { "Aeron Driver [$driverId]: not running" } return false } @@ -499,7 +498,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } if (endPointUsages.isNotEmpty()) { - logger.debug { "Aeron driver [$driverId] still referenced by ${endPointUsages.size} endpoints" } + logger.debug { "Aeron Driver [$driverId]: Still referenced by ${endPointUsages.size} endpoints" } return true } @@ -511,7 +510,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C var count = 3 while (count > 0 && currentUsage > 0) { - logger.debug { "Aeron driver is still in use, double checking status" } + logger.debug { "Aeron Driver [$driverId]: in use, double checking status" } delayTimeout(.5) currentUsage = driverBacklog()?.snapshot()?.size ?: 0 count-- @@ -519,7 +518,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C count = 3 while (count > 0 && currentUsage > 0) { - logger.debug { "Aeron driver is still in use, double checking status (long)" } + logger.debug { "Aeron Driver [$driverId]: in use, double checking status (long)" } delayLingerTimeout() currentUsage = driverBacklog()?.snapshot()?.size ?: 0 count-- @@ -527,7 +526,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C val isInUse = currentUsage > 0 if (isInUse) { - logger.debug { "Aeron driver location usage is: $currentUsage" } + logger.debug { "Aeron Driver [$driverId]: usage is: $currentUsage" } } return isInUse } @@ -541,24 +540,24 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C suspend fun close(endPoint: EndPoint<*>?, logger: KLogger) = stateMutex.withLock { val driverId = config.id - logger.trace { "Aeron driver [$driverId] requested close... (${endPointUsages.size} endpoints in use)" } + logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size} endpoints in use)" } if (endPoint != null) { endPointUsages.remove(endPoint) } if (isInUse(logger)) { - logger.debug { "Aeron driver [$driverId] still in use, not shutting down this instance." } + logger.debug { "Aeron Driver [$driverId]: in use, not shutting down this instance." } return } val removed = AeronDriver.driverConfigurations.remove(driverId) if (removed == null) { - logger.debug { "Aeron driver [$driverId] was already closed. Ignoring close request." } + logger.debug { "Aeron Driver [$driverId]: already closed. Ignoring close request." } return } - logger.debug { "Aeron driver [$driverId] closing..." } + logger.debug { "Aeron Driver [$driverId]: Closing..." } // we have to assign context BEFORE we close, because the `getter` for context will create it if necessary val aeronContext = context @@ -567,22 +566,22 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C try { aeron?.close() } catch (e: Exception) { - logger.error(e) { "Error stopping Aeron driver [$driverId]." } + logger.error(e) { "Aeron Driver [$driverId]: Error stopping!" } } aeron = null if (mediaDriver == null) { - logger.debug { "No driver started for Aeron driver [$driverId]. Not Stopping." } + logger.debug { "Aeron Driver [$driverId]: No driver started, not Stopping." } return } - logger.debug { "Stopping driver [$driverId] at '${driverDirectory}'..." } + logger.debug { "Aeron Driver [$driverId]: Stopping driver at '${driverDirectory}'..." } if (!isRunning()) { // not running - logger.debug { "Driver [$driverId] is not running at '${driverDirectory}' for this context. Not Stopping." } + logger.debug { "Aeron Driver [$driverId]: is not running at '${driverDirectory}' for this context. Not Stopping." } return } @@ -590,7 +589,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C try { mediaDriver!!.close() } catch (e: Exception) { - logger.error(e) { "Error closing the Aeron media driver [$driverId]" } + logger.error(e) { "Aeron Driver [$driverId]: Error closing" } } mediaDriver = null @@ -610,7 +609,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // wait for the media driver to actually stop var count = 10 while (--count >= 0 && isRunning()) { - logger.warn { "Aeron Media driver [$driverId] at '${driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." } + logger.warn { "Aeron Driver [$driverId]: still running at '${driverDirectory}'. Waiting for it to stop. Trying to close $count more times." } delay(timeout) } @@ -623,15 +622,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C try { val deletedAeron = driverDirectory.deleteRecursively() if (!deletedAeron) { - logger.error { "Error deleting aeron directory $driverDirectory on shutdown "} + logger.error { "Aeron Driver [$driverId]: Error deleting aeron directory $driverDirectory on shutdown "} } } catch (e: Exception) { - logger.error(e) { "Error deleting Aeron directory at: $driverDirectory"} + logger.error(e) { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"} } if (driverDirectory.isDirectory) { - logger.error { "Error deleting Aeron directory at: $driverDirectory"} + logger.error { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"} } logger.debug { "Closed the media driver [$driverId] at '${driverDirectory}'" } diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index a69befc3..ada2f824 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -435,9 +435,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // This is set by the client/server so if there is a "connect()" call in the disconnect callback, we can have proper // lock-stop ordering for how disconnect and connect work with each-other - logger.debug {"[$details] connection closed"} - closeAction() + + logger.debug {"[$details] connection closed"} } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index c1f07baa..8982de72 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -331,7 +331,7 @@ internal constructor(val type: Class<*>, @Suppress("LeakingThis") aeronDriver = AeronDriver(this) } catch (e: Exception) { - logger.error("Error initializing endpoint", e) + logger.error("Error initializing aeron driver", e) throw e } } diff --git a/src/dorkbox/network/handshake/RandomId65kAllocator.kt b/src/dorkbox/network/handshake/RandomId65kAllocator.kt index 570b36d9..234ef5db 100644 --- a/src/dorkbox/network/handshake/RandomId65kAllocator.kt +++ b/src/dorkbox/network/handshake/RandomId65kAllocator.kt @@ -94,7 +94,7 @@ class RandomId65kAllocator(private val min: Int, max: Int) { fun free(id: Int) { val assigned = assigned.decrementAndGet() if (assigned < 0) { - throw AllocationException("Unequal allocate/free method calls.") + throw AllocationException("Unequal allocate/free method calls (too many 'free' calls).") } cache.put(id) debugChecks.remove(id) diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index 2f0b58d5..bfceff54 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -148,6 +148,7 @@ internal object ServerHandshakePollers { val listenAddress = mediaDriver.listenAddress val listenAddressString = IP.toString(listenAddress) val timoutInNanos = driver.getLingerNs() + val logInfo = mediaDriver.logInfo suspend fun process(header: Header, buffer: DirectBuffer, offset: Int, length: Int) { // this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe! @@ -250,11 +251,11 @@ internal object ServerHandshakePollers { logger = logger ) } else { - logger.error { "Cannot create publication back to '$clientAddressString'" } + logger.error { "Cannot create $logInfo publication back to '$clientAddressString'" } } // publications are REMOVED from Aeron clients when their linger timeout has expired!!! - driver.closeAndDeletePublication(publication, "HANDSHAKE-$type") + driver.closeAndDeletePublication(publication, logInfo) } } }