diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index 65c19813..0730fbec 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -477,7 +477,7 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger delay(200L) } - closeAndDeletePublication(publication, logInfo) + close(publication, logInfo) val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in $handshakeTimeoutSec seconds while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}")) exception.cleanAllStackTrace() @@ -527,8 +527,8 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger * * This can throw exceptions! */ - suspend fun closeAndDeletePublication(publication: Publication, logInfo: String) { - internal.closeAndDeletePublication(publication, logger, logInfo) + suspend fun close(publication: Publication, logInfo: String) { + internal.close(publication, logger, logInfo) } /** @@ -536,8 +536,8 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger * * This can throw exceptions! */ - suspend fun closeAndDeleteSubscription(subscription: Subscription, logInfo: String) { - internal.closeAndDeleteSubscription(subscription, logger, logInfo) + suspend fun close(subscription: Subscription, logInfo: String) { + internal.close(subscription, logger, logInfo) } @@ -565,6 +565,20 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger suspend fun isInUse(): Boolean = internal.isInUse(logger) + /** + * @return the aeron media driver log file for a specific publication. + */ + fun getMediaDriverFile(publication: Publication): File { + return internal.getMediaDriverFile(publication) + } + + /** + * @return the aeron media driver log file for a specific image (within a subscription, an image is the "connection" with a publication). + */ + fun getMediaDriverFile(image: Image): File { + return internal.getMediaDriverFile(image) + } + /** * expose the internal counters of the Aeron driver */ diff --git a/src/dorkbox/network/aeron/AeronDriverInternal.kt b/src/dorkbox/network/aeron/AeronDriverInternal.kt index 19213207..89c9c4d1 100644 --- a/src/dorkbox/network/aeron/AeronDriverInternal.kt +++ b/src/dorkbox/network/aeron/AeronDriverInternal.kt @@ -511,7 +511,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C /** * Guarantee that the publication is closed AND the backing file is removed */ - suspend fun closeAndDeletePublication(publication: Publication, logger: KLogger, logInfo: String) = stateMutex.withLock { + suspend fun close(publication: Publication, logger: KLogger, logInfo: String) = stateMutex.withLock { val name = if (publication is ConcurrentPublication) { "publication" } else { @@ -562,7 +562,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C /** * Guarantee that the publication is closed AND the backing file is removed */ - suspend fun closeAndDeleteSubscription(subscription: Subscription, logger: KLogger, logInfo: String) { + suspend fun close(subscription: Subscription, logger: KLogger, logInfo: String) { logger.trace { "Aeron Driver [$driverId]: Closing subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" } val aeron1 = aeron @@ -841,17 +841,17 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C /** - * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) + * @return the aeron media driver log file for a specific publication. */ - private fun getMediaDriverFile(publication: Publication): File { + fun getMediaDriverFile(publication: Publication): File { return context.directory.resolve("publications").resolve("${publication.registrationId()}.logbuffer") } /** - * @return the aeron media driver log file for a specific subscription. This should be removed when a subscription is closed (but is not always!) + * @return the aeron media driver log file for a specific image (within a subscription, an image is the "connection" with a publication). */ - private fun getMediaDriverFile(subscription: Subscription): File { - return context.directory.resolve("subscriptions").resolve("${subscription.registrationId()}.logbuffer") + fun getMediaDriverFile(image: Image): File { + return context.directory.resolve("images").resolve("${image.correlationId()}.logbuffer") } /** diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index d576a420..5eb0bbeb 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -401,7 +401,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { logger.debug {"[$toString0] connection closing"} // on close, we want to make sure this file is DELETED! - endPoint.aeronDriver.closeAndDeleteSubscription(subscription, toString0) + endPoint.aeronDriver.close(subscription, toString0) // notify the remote endPoint that we are closing // we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back) @@ -420,7 +420,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { } // on close, we want to make sure this file is DELETED! - endPoint.aeronDriver.closeAndDeletePublication(publication, toString0) + endPoint.aeronDriver.close(publication, toString0) // NOTE: any waiting RMI messages that are in-flight will terminate when they time-out (and then do nothing) // NOTE: notifyDisconnect() is called inside closeAction()!! diff --git a/src/dorkbox/network/handshake/ClientHandshakeDriver.kt b/src/dorkbox/network/handshake/ClientHandshakeDriver.kt index a7730650..f139e21f 100644 --- a/src/dorkbox/network/handshake/ClientHandshakeDriver.kt +++ b/src/dorkbox/network/handshake/ClientHandshakeDriver.kt @@ -333,7 +333,7 @@ internal class ClientHandshakeDriver( streamIdAllocator.free(pubSub.streamIdSub) // on close, we want to make sure this file is DELETED! - aeronDriver.closeAndDeleteSubscription(pubSub.sub, logInfo) - aeronDriver.closeAndDeletePublication(pubSub.pub, logInfo) + aeronDriver.close(pubSub.sub, logInfo) + aeronDriver.close(pubSub.pub, logInfo) } } diff --git a/src/dorkbox/network/handshake/ServerHandshakeDriver.kt b/src/dorkbox/network/handshake/ServerHandshakeDriver.kt index 96a26613..66840d19 100644 --- a/src/dorkbox/network/handshake/ServerHandshakeDriver.kt +++ b/src/dorkbox/network/handshake/ServerHandshakeDriver.kt @@ -63,7 +63,7 @@ internal class ServerHandshakeDriver( } suspend fun close() { - aeronDriver.closeAndDeleteSubscription(subscription, logInfo) + aeronDriver.close(subscription, logInfo) } override fun toString(): String { diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index 4d7fc960..248b2b05 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -77,7 +77,7 @@ internal object ServerHandshakePollers { .expirationPolicy(ExpirationPolicy.CREATED) .expirationListener { connectKey, publication -> runBlocking { - driver.closeAndDeletePublication(publication, "Server IPC Handshake ($connectKey)") + driver.close(publication, "Server IPC Handshake ($connectKey)") } } .build() @@ -89,6 +89,8 @@ internal object ServerHandshakePollers { // for the handshake, the sessionId IS NOT GLOBALLY UNIQUE val sessionId = header.sessionId() val streamId = header.streamId() + val image = header.context() as Image + val logInfo = "$sessionId/$streamId : IPC" // Server is the "source", client mirrors the server // ugh, this is verbose -- but necessary @@ -105,7 +107,13 @@ internal object ServerHandshakePollers { } catch (e: Exception) { server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error de-serializing handshake message!!", e)) null - } ?: return + } ?: run { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + return + } @@ -124,6 +132,10 @@ internal object ServerHandshakePollers { val publication = try { driver.addExclusivePublication(publicationUri, message.streamId, logInfo, true) } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) return@launch } @@ -134,6 +146,10 @@ internal object ServerHandshakePollers { ServerTimedoutException("$logInfo publication cannot connect with client!", cause) } } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) return@launch } @@ -155,16 +171,24 @@ internal object ServerHandshakePollers { if (success) { publications[connectKey] = publication } else { - driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC") + driver.close(publication, "HANDSHAKE-IPC") } } catch (e: Exception) { - driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC") + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + driver.close(publication, "HANDSHAKE-IPC") server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) } } else { val publication = publications.remove(connectKey) if (publication == null) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to IPC")) return@launch } @@ -179,7 +203,7 @@ internal object ServerHandshakePollers { logger = logger ) - driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC") + driver.close(publication, "HANDSHAKE-IPC") } } } @@ -215,7 +239,7 @@ internal object ServerHandshakePollers { .expirationPolicy(ExpirationPolicy.CREATED) .expirationListener { connectKey, publication -> runBlocking { - driver.closeAndDeletePublication(publication, "Server UDP Handshake ($connectKey)") + driver.close(publication, "Server UDP Handshake ($connectKey)") } } .build() @@ -232,9 +256,10 @@ internal object ServerHandshakePollers { // for the handshake, the sessionId IS NOT GLOBALLY UNIQUE val sessionId = header.sessionId() val streamId = header.streamId() + val image = header.context() as Image // note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!) - val remoteIpAndPort = (header.context() as Image).sourceIdentity() + val remoteIpAndPort = image.sourceIdentity() // split val splitPoint = remoteIpAndPort.lastIndexOf(':') @@ -243,6 +268,10 @@ internal object ServerHandshakePollers { // this should never be null, because we are feeding it a valid IP address from aeron val clientAddress = IP.toAddress(clientAddressString) if (clientAddress == null) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + // Server is the "source", client mirrors the server server.listenerManager.notifyError(ServerHandshakeException("[$sessionId/$streamId] Connection from $clientAddressString not allowed! Invalid IP address!")) return @@ -254,6 +283,10 @@ internal object ServerHandshakePollers { clientAddressString = IP.toString(clientAddress) if (ipInfo.ipType == IpInfo.Companion.IpListenType.IPv4Wildcard) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + // we DO NOT want to listen to IPv4 traffic, but we received IPv4 traffic! server.listenerManager.notifyError(ServerHandshakeException("[$sessionId/$streamId] Connection from $clientAddressString not allowed! IPv4 connections not permitted!")) return @@ -277,7 +310,13 @@ internal object ServerHandshakePollers { } catch (e: Exception) { server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error de-serializing handshake message!!", e)) null - } ?: return + } ?: run { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + return + } @@ -301,6 +340,10 @@ internal object ServerHandshakePollers { val publication = try { driver.addExclusivePublication(publicationUri, message.streamId, logInfo, false) } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) return@launch } @@ -311,6 +354,10 @@ internal object ServerHandshakePollers { ServerTimedoutException("$logInfo publication cannot connect with client!", cause) } } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) return@launch } @@ -332,7 +379,11 @@ internal object ServerHandshakePollers { if (success) { publications[connectKey] = publication } else { - driver.closeAndDeletePublication(publication, logInfo) + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + driver.close(publication, logInfo) } } else { @@ -341,6 +392,10 @@ internal object ServerHandshakePollers { val publication = publications.remove(connectKey) if (publication == null) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to $clientAddressString")) return@launch } @@ -355,7 +410,7 @@ internal object ServerHandshakePollers { logger = logger ) - driver.closeAndDeletePublication(publication, logInfo) + driver.close(publication, logInfo) } } } diff --git a/test/dorkboxTest/network/AeronPubSubTest.kt b/test/dorkboxTest/network/AeronPubSubTest.kt index 7e356153..e7b4268c 100644 --- a/test/dorkboxTest/network/AeronPubSubTest.kt +++ b/test/dorkboxTest/network/AeronPubSubTest.kt @@ -90,10 +90,10 @@ class AeronPubSubTest : BaseTest() { clientPublications.forEachIndexed { index, (clientDriver, pub) -> - clientDriver.closeAndDeletePublication(pub, "client_$index") + clientDriver.close(pub, "client_$index") } - serverDriver.closeAndDeleteSubscription(sub, "server") + serverDriver.close(sub, "server") clientDrivers.forEach { clientDriver -> @@ -180,10 +180,10 @@ class AeronPubSubTest : BaseTest() { clientPublications.forEachIndexed { index, (clientDriver, pub) -> - clientDriver.closeAndDeletePublication(pub, "client_$index") + clientDriver.close(pub, "client_$index") } - serverDriver.closeAndDeleteSubscription(sub, "server") + serverDriver.close(sub, "server") clientDrivers.forEach { clientDriver ->