diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index b78f6191..86743d64 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -130,6 +130,15 @@ open class Client(config: ClientConfiguration = ClientC var connectionTimeoutSec: Int = 0 private set + /** + * - if the client is internally going to reconnect (because of a network error) + * - we have specified that we will run the disconnect logic + * - there is reconnect logic in the disconnect handler + * + * Then ultimately, we want to ignore the disconnect-handler reconnect (we do not want to have multiple reconnects happening concurrently) + */ + @Volatile + private var autoReconnect = false private val handshake = ClientHandshake(this, logger) @@ -165,6 +174,15 @@ open class Client(config: ClientConfiguration = ClientC */ @Suppress("DuplicatedCode") fun reconnect() { + if (autoReconnect) { + // we must check if we should permit a MANUAL reconnect, because the auto-reconnect MIGHT ALSO re-connect! + + // autoReconnect will be "reset" when the connection closes. If in a happy state, then a manual reconnect is permitted. + logger.info("Ignoring reconnect, auto-reconnect is in progress") + return + } + + if (connectionTimeoutSec == 0) { logger.info("Reconnecting...") } else { @@ -509,6 +527,7 @@ open class Client(config: ClientConfiguration = ClientC val startTime = System.nanoTime() var success = false while (!stopConnectOnShutdown && (connectionTimoutInNs == 0L || System.nanoTime() - startTime < connectionTimoutInNs)) { + if (isShutdown()) { resetOnError() @@ -639,11 +658,12 @@ open class Client(config: ClientConfiguration = ClientC if (stopConnectOnShutdown) { val exception = ClientException("Client closed during connection attempt. Aborting connection attempts.").cleanStackTrace(3) listenerManager.notifyError(exception) + // if we are waiting for this connection to connect (on a different thread, for example), make sure to release it. + closeLatch.countDown() throw exception } if (System.nanoTime() - startTime < connectionTimoutInNs) { - val type = if (isIPC) { "IPC" } else { @@ -703,7 +723,7 @@ open class Client(config: ClientConfiguration = ClientC // NOTE: this can change depending on what the server specifies! // should we queue messages during a reconnect? This is important if the client/server connection is unstable if (connectionInfo.enableSession && sessionManager is SessionManagerNoOp) { - sessionManager = SessionManagerFull(config, aeronDriver, connectionInfo.sessionTimeout) + sessionManager = SessionManagerFull(config, listenerManager as ListenerManager, aeronDriver, connectionInfo.sessionTimeout) } else if (!connectionInfo.enableSession && sessionManager is SessionManagerFull) { sessionManager = SessionManagerNoOp() } @@ -809,12 +829,14 @@ open class Client(config: ClientConfiguration = ClientC logger.debug("[${handshakeConnection.details}] $connType (${newConnection.id}) done with handshake.") } + connection0 = newConnection + newConnection.setImage() + + // in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session // when applicable - we ALSO want to restore RMI objects BEFORE the connection is fully setup! val newSession = sessionManager.onInit(newConnection) - newConnection.setImage() - // before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls if (newSession) { listenerManager.notifyInit(newConnection) @@ -822,29 +844,30 @@ open class Client(config: ClientConfiguration = ClientC // this enables the connection to start polling for messages addConnection(newConnection) - connection0 = newConnection - + // if we shutdown/close before the poller starts, we don't want to block forever pollerClosedLatch = CountDownLatch(1) networkEventPoller.submit( action = object : EventActionOperator { override fun invoke(): Int { + val connection = connection0 + // if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed // we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip - return if (!(shutdownEventPoller || newConnection.isClosed() || newConnection.isClosedWithTimeout())) { - newConnection.poll() - } else { - // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. - if (logger.isDebugEnabled) { - logger.debug("[${connection}] connection expired (cleanup)") + return if (connection != null) { + if (!shutdownEventPoller && connection.canPoll()) { + connection.poll() + } else { + // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. + logger.error("[${connection}] connection expired (cleanup). shutdownEventPoller=$shutdownEventPoller isClosed()=${connection.isClosed()} isClosedWithTimeout=${connection.isClosedWithTimeout()}") + + if (logger.isDebugEnabled) { + logger.debug("[{}] connection expired (cleanup)", connection) + } + // remove ourselves from processing + EventPoller.REMOVE } - - // the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected) - removeConnection(newConnection) - - // we already removed the connection, we can call it again without side effects - newConnection.close() - + } else { // remove ourselves from processing EventPoller.REMOVE } @@ -852,14 +875,20 @@ open class Client(config: ClientConfiguration = ClientC }, onClose = object : EventCloseOperator { override fun invoke() { - val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError - val uncleanDisconnect = !shutdownEventPoller && !newConnection.isClosed() && newConnection.isClosedWithTimeout() - val autoReconnect = mustRestartDriverOnError || uncleanDisconnect + val connection = connection0 + if (connection == null) { + logger.error("Unable to continue, as the connection has been removed before event dispatch shutdown!") + return + } + val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError + val dirtyDisconnectWithSession = (this@Client is SessionClient) && !shutdownEventPoller && connection.isDirtyClose() + + autoReconnect = mustRestartDriverOnError || dirtyDisconnectWithSession if (mustRestartDriverOnError) { logger.error("[{}] Critical driver error detected, reconnecting client", connection) - } else if (dirtyDisconnect) { + } else if (dirtyDisconnectWithSession) { logger.error("[{}] Dirty disconnect detected, reconnecting client", connection) } @@ -871,13 +900,7 @@ open class Client(config: ClientConfiguration = ClientC // we only need to run shutdown methods if there was a network outage or D/C if (!shutdownInProgress.value) { // this is because we restart automatically on driver errors/weird timeouts - val standardClose = !autoReconnect - this@Client.close( - closeEverything = false, - sendDisconnectMessage = standardClose, - notifyDisconnect = standardClose, - releaseWaitingThreads = standardClose - ) + this@Client.close(closeEverything = false, sendDisconnectMessage = true, releaseWaitingThreads = !autoReconnect) } @@ -885,29 +908,49 @@ open class Client(config: ClientConfiguration = ClientC endpointIsRunning.lazySet(false) pollerClosedLatch.countDown() + + connection0 = null + if (autoReconnect) { - EventDispatcher.launchSequentially(EventDispatcher.CONNECT) { + // clients can reconnect automatically ONLY if there are driver errors, otherwise it's explicit! + eventDispatch.CLOSE.launch { + logger.error("MUST AUTORECONNECT STARTING ON CLOSE ***********************************") waitForEndpointShutdown() // also wait for everyone else to shutdown!! aeronDriver.internal.endPointUsages.forEach { - it.waitForEndpointShutdown() + if (it !== this@Client) { + it.waitForEndpointShutdown() + } } // if we restart/reconnect too fast, errors from the previous run will still be present! aeronDriver.delayLingerTimeout() - reconnect() + if (connectionTimeoutSec == 0) { + logger.info("Reconnecting...", Exception()) + } else { + logger.info("Reconnecting... (timeout in $connectionTimeoutSec seconds)", Exception()) + } + + connect( + remoteAddress = address, + remoteAddressString = addressString, + remoteAddressPrettyString = addressPrettyString, + port1 = port1, + port2 = port2, + connectionTimeoutSec = connectionTimeoutSec, + reliable = reliable, + ) } - } else { - logger.debug("[{}] Closed the Network Event Poller...", connection) } + logger.debug("[{}] Closed the Network Event Poller task.", connection) } }) - if (newSession) { - listenerManager.notifyConnect(newConnection) - } else { + listenerManager.notifyConnect(newConnection) + + if (!newSession) { (newConnection as SessionConnection).sendPendingMessages() } } @@ -1007,12 +1050,7 @@ open class Client(config: ClientConfiguration = ClientC */ fun close(closeEverything: Boolean = true) { stopConnectOnShutdown = true - close( - closeEverything = closeEverything, - sendDisconnectMessage = true, - notifyDisconnect = true, - releaseWaitingThreads = true - ) + close(closeEverything = closeEverything, sendDisconnectMessage = true, releaseWaitingThreads = true) } override fun toString(): String { diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index f245dd7d..73dbda81 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -233,7 +233,7 @@ open class Server(config: ServerConfiguration = ServerC // this manages existing clients (for cleanup + connection polling). This has a concurrent iterator, // so we can modify this as we go connections.forEach { connection -> - if (!(connection.isClosed() || connection.isClosedWithTimeout()) ) { + if (connection.canPoll()) { // Otherwise, poll the connection for messages pollCount += connection.poll() } else {