diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index d52de304..c1bc8649 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -126,6 +126,7 @@ internal object ServerHandshakePollers { if (messageState == HandshakeMessage.HELLO) { // we create a NEW publication for the handshake, which connects directly to the client handshake subscription + val publicationUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable) // this will always connect to the CLIENT handshake subscription! @@ -171,19 +172,20 @@ internal object ServerHandshakePollers { if (success) { publications[connectKey] = publication } else { - driver.close(publication, "HANDSHAKE-IPC") + driver.close(publication, logInfo) } } catch (e: Exception) { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.getMediaDriverFile(image).delete() - driver.close(publication, "HANDSHAKE-IPC") + driver.close(publication, logInfo) server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) } } else { - val publication = publications.remove(connectKey) + // HandshakeMessage.DONE + val publication = publications.remove(connectKey) if (publication == null) { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! @@ -193,17 +195,24 @@ internal object ServerHandshakePollers { return@launch } - // HandshakeMessage.DONE - handshake.validateMessageTypeAndDoPending( - server = server, - handshaker = handshaker, - handshakePublication = publication, - message = message, - aeronLogInfo = logInfo, - logger = logger - ) + try { + handshake.validateMessageTypeAndDoPending( + server = server, + handshaker = handshaker, + handshakePublication = publication, + message = message, + aeronLogInfo = logInfo, + logger = logger + ) + } catch (e: Exception) { + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) + } - driver.close(publication, "HANDSHAKE-IPC") + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + driver.close(publication, logInfo) } } } @@ -370,29 +379,39 @@ internal object ServerHandshakePollers { return@launch } - val success = handshake.processUdpHandshakeMessageServer( - server = server, - handshaker = handshaker, - handshakePublication = publication, - publicKey = message.publicKey!!, - clientAddress = clientAddress, - clientAddressString = clientAddressString, - isReliable = isReliable, - message = message, - aeronLogInfo = logInfo, - connectionFunc = connectionFunc, - logger = logger - ) - if (success) { - publications[connectKey] = publication - } else { + try { + val success = handshake.processUdpHandshakeMessageServer( + server = server, + handshaker = handshaker, + handshakePublication = publication, + publicKey = message.publicKey!!, + clientAddress = clientAddress, + clientAddressString = clientAddressString, + isReliable = isReliable, + message = message, + aeronLogInfo = logInfo, + connectionFunc = connectionFunc, + logger = logger + ) + + if (success) { + publications[connectKey] = publication + } else { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() + + driver.close(publication, logInfo) + } + } catch (e: Exception) { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.getMediaDriverFile(image).delete() - driver.close(publication, logInfo) - } + driver.close(publication, logInfo) + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) + } } else { // HandshakeMessage.DONE @@ -407,15 +426,22 @@ internal object ServerHandshakePollers { return@launch } + try { + handshake.validateMessageTypeAndDoPending( + server = server, + handshaker = handshaker, + handshakePublication = publication, + message = message, + aeronLogInfo = logInfo, + logger = logger + ) + } catch (e: Exception) { + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) + } - handshake.validateMessageTypeAndDoPending( - server = server, - handshaker = handshaker, - handshakePublication = publication, - message = message, - aeronLogInfo = logInfo, - logger = logger - ) + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.getMediaDriverFile(image).delete() driver.close(publication, logInfo) }