Fixed issues when deserializing messages during the HANDSHAKE, where we would run out of memory (instantly delete the image file representing the bad connection).

This commit is contained in:
Robinson 2023-07-03 10:46:18 +02:00
parent 0ae5b5f927
commit ee296394de
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
7 changed files with 100 additions and 31 deletions

View File

@ -477,7 +477,7 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
delay(200L)
}
closeAndDeletePublication(publication, logInfo)
close(publication, logInfo)
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in $handshakeTimeoutSec seconds while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}"))
exception.cleanAllStackTrace()
@ -527,8 +527,8 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
*
* This can throw exceptions!
*/
suspend fun closeAndDeletePublication(publication: Publication, logInfo: String) {
internal.closeAndDeletePublication(publication, logger, logInfo)
suspend fun close(publication: Publication, logInfo: String) {
internal.close(publication, logger, logInfo)
}
/**
@ -536,8 +536,8 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
*
* This can throw exceptions!
*/
suspend fun closeAndDeleteSubscription(subscription: Subscription, logInfo: String) {
internal.closeAndDeleteSubscription(subscription, logger, logInfo)
suspend fun close(subscription: Subscription, logInfo: String) {
internal.close(subscription, logger, logInfo)
}
@ -565,6 +565,20 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
suspend fun isInUse(): Boolean = internal.isInUse(logger)
/**
* @return the aeron media driver log file for a specific publication.
*/
fun getMediaDriverFile(publication: Publication): File {
return internal.getMediaDriverFile(publication)
}
/**
* @return the aeron media driver log file for a specific image (within a subscription, an image is the "connection" with a publication).
*/
fun getMediaDriverFile(image: Image): File {
return internal.getMediaDriverFile(image)
}
/**
* expose the internal counters of the Aeron driver
*/

View File

@ -511,7 +511,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
/**
* Guarantee that the publication is closed AND the backing file is removed
*/
suspend fun closeAndDeletePublication(publication: Publication, logger: KLogger, logInfo: String) = stateMutex.withLock {
suspend fun close(publication: Publication, logger: KLogger, logInfo: String) = stateMutex.withLock {
val name = if (publication is ConcurrentPublication) {
"publication"
} else {
@ -562,7 +562,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
/**
* Guarantee that the publication is closed AND the backing file is removed
*/
suspend fun closeAndDeleteSubscription(subscription: Subscription, logger: KLogger, logInfo: String) {
suspend fun close(subscription: Subscription, logger: KLogger, logInfo: String) {
logger.trace { "Aeron Driver [$driverId]: Closing subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" }
val aeron1 = aeron
@ -841,17 +841,17 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
/**
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
* @return the aeron media driver log file for a specific publication.
*/
private fun getMediaDriverFile(publication: Publication): File {
fun getMediaDriverFile(publication: Publication): File {
return context.directory.resolve("publications").resolve("${publication.registrationId()}.logbuffer")
}
/**
* @return the aeron media driver log file for a specific subscription. This should be removed when a subscription is closed (but is not always!)
* @return the aeron media driver log file for a specific image (within a subscription, an image is the "connection" with a publication).
*/
private fun getMediaDriverFile(subscription: Subscription): File {
return context.directory.resolve("subscriptions").resolve("${subscription.registrationId()}.logbuffer")
fun getMediaDriverFile(image: Image): File {
return context.directory.resolve("images").resolve("${image.correlationId()}.logbuffer")
}
/**

View File

@ -401,7 +401,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
logger.debug {"[$toString0] connection closing"}
// on close, we want to make sure this file is DELETED!
endPoint.aeronDriver.closeAndDeleteSubscription(subscription, toString0)
endPoint.aeronDriver.close(subscription, toString0)
// notify the remote endPoint that we are closing
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
@ -420,7 +420,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// on close, we want to make sure this file is DELETED!
endPoint.aeronDriver.closeAndDeletePublication(publication, toString0)
endPoint.aeronDriver.close(publication, toString0)
// NOTE: any waiting RMI messages that are in-flight will terminate when they time-out (and then do nothing)
// NOTE: notifyDisconnect() is called inside closeAction()!!

View File

@ -333,7 +333,7 @@ internal class ClientHandshakeDriver(
streamIdAllocator.free(pubSub.streamIdSub)
// on close, we want to make sure this file is DELETED!
aeronDriver.closeAndDeleteSubscription(pubSub.sub, logInfo)
aeronDriver.closeAndDeletePublication(pubSub.pub, logInfo)
aeronDriver.close(pubSub.sub, logInfo)
aeronDriver.close(pubSub.pub, logInfo)
}
}

View File

@ -63,7 +63,7 @@ internal class ServerHandshakeDriver(
}
suspend fun close() {
aeronDriver.closeAndDeleteSubscription(subscription, logInfo)
aeronDriver.close(subscription, logInfo)
}
override fun toString(): String {

View File

@ -77,7 +77,7 @@ internal object ServerHandshakePollers {
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<Long, Publication> { connectKey, publication ->
runBlocking {
driver.closeAndDeletePublication(publication, "Server IPC Handshake ($connectKey)")
driver.close(publication, "Server IPC Handshake ($connectKey)")
}
}
.build<Long, Publication>()
@ -89,6 +89,8 @@ internal object ServerHandshakePollers {
// for the handshake, the sessionId IS NOT GLOBALLY UNIQUE
val sessionId = header.sessionId()
val streamId = header.streamId()
val image = header.context() as Image
val logInfo = "$sessionId/$streamId : IPC" // Server is the "source", client mirrors the server
// ugh, this is verbose -- but necessary
@ -105,7 +107,13 @@ internal object ServerHandshakePollers {
} catch (e: Exception) {
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error de-serializing handshake message!!", e))
null
} ?: return
} ?: run {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
return
}
@ -124,6 +132,10 @@ internal object ServerHandshakePollers {
val publication = try {
driver.addExclusivePublication(publicationUri, message.streamId, logInfo, true)
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e))
return@launch
}
@ -134,6 +146,10 @@ internal object ServerHandshakePollers {
ServerTimedoutException("$logInfo publication cannot connect with client!", cause)
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e))
return@launch
}
@ -155,16 +171,24 @@ internal object ServerHandshakePollers {
if (success) {
publications[connectKey] = publication
} else {
driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC")
driver.close(publication, "HANDSHAKE-IPC")
}
} catch (e: Exception) {
driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC")
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, "HANDSHAKE-IPC")
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
} else {
val publication = publications.remove(connectKey)
if (publication == null) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to IPC"))
return@launch
}
@ -179,7 +203,7 @@ internal object ServerHandshakePollers {
logger = logger
)
driver.closeAndDeletePublication(publication, "HANDSHAKE-IPC")
driver.close(publication, "HANDSHAKE-IPC")
}
}
}
@ -215,7 +239,7 @@ internal object ServerHandshakePollers {
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<Long, Publication> { connectKey, publication ->
runBlocking {
driver.closeAndDeletePublication(publication, "Server UDP Handshake ($connectKey)")
driver.close(publication, "Server UDP Handshake ($connectKey)")
}
}
.build<Long, Publication>()
@ -232,9 +256,10 @@ internal object ServerHandshakePollers {
// for the handshake, the sessionId IS NOT GLOBALLY UNIQUE
val sessionId = header.sessionId()
val streamId = header.streamId()
val image = header.context() as Image
// note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!)
val remoteIpAndPort = (header.context() as Image).sourceIdentity()
val remoteIpAndPort = image.sourceIdentity()
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
@ -243,6 +268,10 @@ internal object ServerHandshakePollers {
// this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IP.toAddress(clientAddressString)
if (clientAddress == null) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
// Server is the "source", client mirrors the server
server.listenerManager.notifyError(ServerHandshakeException("[$sessionId/$streamId] Connection from $clientAddressString not allowed! Invalid IP address!"))
return
@ -254,6 +283,10 @@ internal object ServerHandshakePollers {
clientAddressString = IP.toString(clientAddress)
if (ipInfo.ipType == IpInfo.Companion.IpListenType.IPv4Wildcard) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
// we DO NOT want to listen to IPv4 traffic, but we received IPv4 traffic!
server.listenerManager.notifyError(ServerHandshakeException("[$sessionId/$streamId] Connection from $clientAddressString not allowed! IPv4 connections not permitted!"))
return
@ -277,7 +310,13 @@ internal object ServerHandshakePollers {
} catch (e: Exception) {
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error de-serializing handshake message!!", e))
null
} ?: return
} ?: run {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
return
}
@ -301,6 +340,10 @@ internal object ServerHandshakePollers {
val publication = try {
driver.addExclusivePublication(publicationUri, message.streamId, logInfo, false)
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e))
return@launch
}
@ -311,6 +354,10 @@ internal object ServerHandshakePollers {
ServerTimedoutException("$logInfo publication cannot connect with client!", cause)
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e))
return@launch
}
@ -332,7 +379,11 @@ internal object ServerHandshakePollers {
if (success) {
publications[connectKey] = publication
} else {
driver.closeAndDeletePublication(publication, logInfo)
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, logInfo)
}
} else {
@ -341,6 +392,10 @@ internal object ServerHandshakePollers {
val publication = publications.remove(connectKey)
if (publication == null) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to $clientAddressString"))
return@launch
}
@ -355,7 +410,7 @@ internal object ServerHandshakePollers {
logger = logger
)
driver.closeAndDeletePublication(publication, logInfo)
driver.close(publication, logInfo)
}
}
}

View File

@ -90,10 +90,10 @@ class AeronPubSubTest : BaseTest() {
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.closeAndDeletePublication(pub, "client_$index")
clientDriver.close(pub, "client_$index")
}
serverDriver.closeAndDeleteSubscription(sub, "server")
serverDriver.close(sub, "server")
clientDrivers.forEach { clientDriver ->
@ -180,10 +180,10 @@ class AeronPubSubTest : BaseTest() {
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.closeAndDeletePublication(pub, "client_$index")
clientDriver.close(pub, "client_$index")
}
serverDriver.closeAndDeleteSubscription(sub, "server")
serverDriver.close(sub, "server")
clientDrivers.forEach { clientDriver ->