Better try/catch around handshake logic

This commit is contained in:
Robinson 2023-07-03 11:52:46 +02:00
parent 7a39044df0
commit 7b7910d078
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -126,6 +126,7 @@ internal object ServerHandshakePollers {
if (messageState == HandshakeMessage.HELLO) {
// we create a NEW publication for the handshake, which connects directly to the client handshake subscription
val publicationUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable)
// this will always connect to the CLIENT handshake subscription!
@ -171,19 +172,20 @@ internal object ServerHandshakePollers {
if (success) {
publications[connectKey] = publication
} else {
driver.close(publication, "HANDSHAKE-IPC")
driver.close(publication, logInfo)
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, "HANDSHAKE-IPC")
driver.close(publication, logInfo)
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
} else {
val publication = publications.remove(connectKey)
// HandshakeMessage.DONE
val publication = publications.remove(connectKey)
if (publication == null) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
@ -193,17 +195,24 @@ internal object ServerHandshakePollers {
return@launch
}
// HandshakeMessage.DONE
handshake.validateMessageTypeAndDoPending(
server = server,
handshaker = handshaker,
handshakePublication = publication,
message = message,
aeronLogInfo = logInfo,
logger = logger
)
try {
handshake.validateMessageTypeAndDoPending(
server = server,
handshaker = handshaker,
handshakePublication = publication,
message = message,
aeronLogInfo = logInfo,
logger = logger
)
} catch (e: Exception) {
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
driver.close(publication, "HANDSHAKE-IPC")
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, logInfo)
}
}
}
@ -370,29 +379,39 @@ internal object ServerHandshakePollers {
return@launch
}
val success = handshake.processUdpHandshakeMessageServer(
server = server,
handshaker = handshaker,
handshakePublication = publication,
publicKey = message.publicKey!!,
clientAddress = clientAddress,
clientAddressString = clientAddressString,
isReliable = isReliable,
message = message,
aeronLogInfo = logInfo,
connectionFunc = connectionFunc,
logger = logger
)
if (success) {
publications[connectKey] = publication
} else {
try {
val success = handshake.processUdpHandshakeMessageServer(
server = server,
handshaker = handshaker,
handshakePublication = publication,
publicKey = message.publicKey!!,
clientAddress = clientAddress,
clientAddressString = clientAddressString,
isReliable = isReliable,
message = message,
aeronLogInfo = logInfo,
connectionFunc = connectionFunc,
logger = logger
)
if (success) {
publications[connectKey] = publication
} else {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, logInfo)
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, logInfo)
}
driver.close(publication, logInfo)
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
} else {
// HandshakeMessage.DONE
@ -407,15 +426,22 @@ internal object ServerHandshakePollers {
return@launch
}
try {
handshake.validateMessageTypeAndDoPending(
server = server,
handshaker = handshaker,
handshakePublication = publication,
message = message,
aeronLogInfo = logInfo,
logger = logger
)
} catch (e: Exception) {
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
handshake.validateMessageTypeAndDoPending(
server = server,
handshaker = handshaker,
handshakePublication = publication,
message = message,
aeronLogInfo = logInfo,
logger = logger
)
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.getMediaDriverFile(image).delete()
driver.close(publication, logInfo)
}