diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 11e7a49e..b420ef47 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -223,6 +223,8 @@ internal class ServerHandshake( /** + * NOTE: This must not be called on the main thread because it is blocking! + * * @return true if the connection was SUCCESS. False if the handshake poller should immediately close the publication */ fun processIpcHandshakeMessageServer( @@ -331,6 +333,7 @@ internal class ServerHandshake( var newConnection: CONNECTION? = null try { // Create a pub/sub at the given address and port, using the given stream ID. + // NOTE: This must not be called on the main thread because it is blocking! val newConnectionDriver = ServerConnectionDriver.build( aeronDriver = aeronDriver, ipInfo = server.ipInfo, @@ -422,7 +425,7 @@ internal class ServerHandshake( } /** - * note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD + * NOTE: This must not be called on the main thread because it is blocking! * * @return true if the connection was SUCCESS. False if the handshake poller should immediately close the publication */ @@ -585,6 +588,7 @@ internal class ServerHandshake( var newConnection: CONNECTION? = null try { // Create a pub/sub at the given address and port, using the given stream ID. + // NOTE: This must not be called on the main thread because it is blocking! val newConnectionDriver = ServerConnectionDriver.build( ipInfo = server.ipInfo, aeronDriver = aeronDriver, diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index b85adb24..247dda5d 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -26,6 +26,7 @@ import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake import dorkbox.network.aeron.AeronPoller import dorkbox.network.connection.Connection +import dorkbox.network.connection.EventDispatcher import dorkbox.network.connection.IpInfo import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerHandshakeException @@ -115,63 +116,84 @@ internal object ServerHandshakePollers { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.deleteLogFile(image) - return } - // we have read all the data, now dispatch it. - // HandshakeMessage.HELLO - // HandshakeMessage.DONE - val messageState = message.state - val connectKey = message.connectKey - if (messageState == HandshakeMessage.HELLO) { - // we create a NEW publication for the handshake, which connects directly to the client handshake subscription + // NOTE: This MUST to happen in separates thread so that we can take as long as we need when creating publications and handshaking, + // because under load -- this will REGULARLY timeout! Under no circumstance can this happen in the main processing thread!! + EventDispatcher.MULTI.launch { + // we have read all the data, now dispatch it. + // HandshakeMessage.HELLO + // HandshakeMessage.DONE + val messageState = message.state + val connectKey = message.connectKey - val publicationUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable) - // this will always connect to the CLIENT handshake subscription! - val publication = try { - driver.addExclusivePublication(publicationUri, message.streamId, logInfo, true) - } catch (e: Exception) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) + if (messageState == HandshakeMessage.HELLO) { + // we create a NEW publication for the handshake, which connects directly to the client handshake subscription - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) - return - } + val publicationUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable) - try { - // we actually have to wait for it to connect before we continue - driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause -> - ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause) + // this will always connect to the CLIENT handshake subscription! + val publication = try { + driver.addExclusivePublication(publicationUri, message.streamId, logInfo, true) } - } catch (e: Exception) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) + catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) - return - } + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) + return@launch + } + + try { + // we actually have to wait for it to connect before we continue + driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause -> + ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause) + } + } + catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) + + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create IPC publication back to client remote process", e)) + return@launch + } - try { - val success = handshake.processIpcHandshakeMessageServer( - server = server, - handshaker = handshaker, - aeronDriver = driver, - handshakePublication = publication, - publicKey = message.publicKey!!, - message = message, - logInfo = logInfo, - logger = logger - ) + try { + val success = handshake.processIpcHandshakeMessageServer( + server = server, + handshaker = handshaker, + aeronDriver = driver, + handshakePublication = publication, + publicKey = message.publicKey!!, + message = message, + logInfo = logInfo, + logger = logger + ) + + if (success) { + publications[connectKey] = publication + } + else { + try { + // we might not be able to close this connection. + driver.close(publication, logInfo) + } + catch (e: Exception) { + server.listenerManager.notifyError(e) + } + } + } + catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) - if (success) { - publications[connectKey] = publication - } else { try { // we might not be able to close this connection. driver.close(publication, logInfo) @@ -179,8 +201,35 @@ internal object ServerHandshakePollers { catch (e: Exception) { server.listenerManager.notifyError(e) } + + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) } - } catch (e: Exception) { + } else { + // HandshakeMessage.DONE + + val publication = publications.remove(connectKey) + if (publication == null) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) + + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to IPC")) + return@launch + } + + try { + handshake.validateMessageTypeAndDoPending( + server = server, + handshaker = handshaker, + handshakePublication = publication, + message = message, + logInfo = logInfo, + logger = logger + ) + } catch (e: Exception) { + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) + } + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.deleteLogFile(image) @@ -192,45 +241,6 @@ internal object ServerHandshakePollers { catch (e: Exception) { server.listenerManager.notifyError(e) } - - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) - } - } else { - // HandshakeMessage.DONE - - val publication = publications.remove(connectKey) - if (publication == null) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) - - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to IPC")) - return - } - - try { - handshake.validateMessageTypeAndDoPending( - server = server, - handshaker = handshaker, - handshakePublication = publication, - message = message, - logInfo = logInfo, - logger = logger - ) - } catch (e: Exception) { - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) - } - - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) - - try { - // we might not be able to close this connection. - driver.close(publication, logInfo) - } - catch (e: Exception) { - server.listenerManager.notifyError(e) } } } @@ -365,68 +375,84 @@ internal object ServerHandshakePollers { return } + // NOTE: This MUST to happen in separates thread so that we can take as long as we need when creating publications and handshaking, + // because under load -- this will REGULARLY timeout! Under no circumstance can this happen in the main processing thread!! + EventDispatcher.MULTI.launch { + // HandshakeMessage.HELLO + // HandshakeMessage.DONE + val messageState = message.state + val connectKey = message.connectKey - // HandshakeMessage.HELLO - // HandshakeMessage.DONE - val messageState = message.state - val connectKey = message.connectKey + if (messageState == HandshakeMessage.HELLO) { + // we create a NEW publication for the handshake, which connects directly to the client handshake subscription - if (messageState == HandshakeMessage.HELLO) { - // we create a NEW publication for the handshake, which connects directly to the client handshake subscription - - // we explicitly have the publisher "connect to itself", because we are using MDC to work around NAT. - // It will "auto-connect" to the correct client port (negotiated by the MDC client subscription negotiating on the - // control port of the server) - val publicationUri = uriHandshake(CommonContext.UDP_MEDIA, isReliable) - .controlEndpoint(ipInfo.getAeronPubAddress(isRemoteIpv4) + ":" + mdcPortPub) + // we explicitly have the publisher "connect to itself", because we are using MDC to work around NAT. + // It will "auto-connect" to the correct client port (negotiated by the MDC client subscription negotiating on the + // control port of the server) + val publicationUri = uriHandshake(CommonContext.UDP_MEDIA, isReliable) + .controlEndpoint(ipInfo.getAeronPubAddress(isRemoteIpv4) + ":" + mdcPortPub) - // this will always connect to the CLIENT handshake subscription! - val publication = try { - driver.addExclusivePublication(publicationUri, message.streamId, logInfo, false) - } catch (e: Exception) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) + // this will always connect to the CLIENT handshake subscription! + val publication = try { + driver.addExclusivePublication(publicationUri, message.streamId, logInfo, false) + } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) - return - } - - try { - // we actually have to wait for it to connect before we continue - driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause -> - ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause) + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) + return@launch } - } catch (e: Exception) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) - return - } + try { + // we actually have to wait for it to connect before we continue. + // + driver.waitForConnection(publication, handshakeTimeoutNs, logInfo) { cause -> + ServerTimedoutException("$logInfo publication cannot connect with client in ${Sys.getTimePrettyFull(handshakeTimeoutNs)}", cause) + } + } catch (e: Exception) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) - try { - val success = handshake.processUdpHandshakeMessageServer( - server = server, - handshaker = handshaker, - handshakePublication = publication, - publicKey = message.publicKey!!, - clientAddress = clientAddress, - clientAddressString = clientAddressString, - portPub = message.port, - portSub = serverPortSub, - mdcPortPub = mdcPortPub, - isReliable = isReliable, - message = message, - logInfo = logInfo, - logger = logger - ) + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Cannot create publication back to $clientAddressString", e)) + return@launch + } - if (success) { - publications[connectKey] = publication - } else { + try { + val success = handshake.processUdpHandshakeMessageServer( + server = server, + handshaker = handshaker, + handshakePublication = publication, + publicKey = message.publicKey!!, + clientAddress = clientAddress, + clientAddressString = clientAddressString, + portPub = message.port, + portSub = serverPortSub, + mdcPortPub = mdcPortPub, + isReliable = isReliable, + message = message, + logInfo = logInfo, + logger = logger + ) + + if (success) { + publications[connectKey] = publication + } else { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) + + try { + // we might not be able to close this connection. + driver.close(publication, logInfo) + } + catch (e: Exception) { + server.listenerManager.notifyError(e) + } + } + } catch (e: Exception) { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.deleteLogFile(image) @@ -436,62 +462,50 @@ internal object ServerHandshakePollers { driver.close(publication, logInfo) } catch (e: Exception) { - server.listenerManager.notifyError(e) + driver.close(publication, logInfo) } + + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) + } + } else { + // HandshakeMessage.DONE + + val publication = publications.remove(connectKey) + + if (publication == null) { + // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors + // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! + driver.deleteLogFile(image) + + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to $clientAddressString")) + return@launch + } + + try { + handshake.validateMessageTypeAndDoPending( + server = server, + handshaker = handshaker, + handshakePublication = publication, + message = message, + logInfo = logInfo, + logger = logger + ) + } catch (e: Exception) { + server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) } - } catch (e: Exception) { - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) try { // we might not be able to close this connection. driver.close(publication, logInfo) } catch (e: Exception) { - driver.close(publication, logInfo) + server.listenerManager.notifyError(e) } - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) - } - } else { - // HandshakeMessage.DONE - - val publication = publications.remove(connectKey) - - if (publication == null) { // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! driver.deleteLogFile(image) - - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] No publication back to $clientAddressString")) - return } - - try { - handshake.validateMessageTypeAndDoPending( - server = server, - handshaker = handshaker, - handshakePublication = publication, - message = message, - logInfo = logInfo, - logger = logger - ) - } catch (e: Exception) { - server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e)) - } - - try { - // we might not be able to close this connection. - driver.close(publication, logInfo) - } - catch (e: Exception) { - server.listenerManager.notifyError(e) - } - - // we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors - // and connections occur too quickly (within the cleanup/linger period), we can run out of memory! - driver.deleteLogFile(image) } }