From 88f082097a0a79245befe75ee7df921b6864df49 Mon Sep 17 00:00:00 2001 From: Robinson Date: Sun, 25 Jun 2023 12:21:21 +0200 Subject: [PATCH] code cleanup --- src/dorkbox/network/Client.kt | 346 +++++++++++++++------------------- src/dorkbox/network/Server.kt | 170 ++++------------- 2 files changed, 190 insertions(+), 326 deletions(-) diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 535f58c5..c51d1a00 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -29,26 +29,27 @@ import dorkbox.network.connection.Connection import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EventDispatcher -import dorkbox.network.connection.EventDispatcher.Companion.EVENT import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.exceptions.ClientException +import dorkbox.network.exceptions.ClientHandshakeException import dorkbox.network.exceptions.ClientRejectedException import dorkbox.network.exceptions.ClientRetryException import dorkbox.network.exceptions.ClientShutdownException import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ServerException +import dorkbox.network.exceptions.TransmitException import dorkbox.network.handshake.ClientHandshake import dorkbox.network.ping.Ping -import kotlinx.atomicfu.atomic +import dorkbox.util.sync.CountDownLatch import kotlinx.coroutines.runBlocking import mu.KotlinLogging import java.net.Inet4Address import java.net.Inet6Address import java.net.InetAddress -import java.util.concurrent.TimeUnit +import java.util.concurrent.* /** * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's @@ -143,9 +144,7 @@ open class Client( val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2) val logger = KotlinLogging.logger(Client::class.java.simpleName) - AeronDriver(configuration, logger).use { - it.ensureStopped(timeout, 500) - } + AeronDriver.ensureStopped(configuration, logger, timeout) } /** @@ -157,9 +156,7 @@ open class Client( */ fun isRunning(configuration: Configuration): Boolean = runBlocking { val logger = KotlinLogging.logger(Client::class.java.simpleName) - AeronDriver(configuration, logger).use { - it.isRunning() - } + AeronDriver.isRunning(configuration, logger) } init { @@ -168,11 +165,6 @@ open class Client( } } - /** - * The UUID is a unique, in-memory instance that is created on object construction - */ - val uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate() - /** * The network or IPC address for the client to connect to. * @@ -199,18 +191,10 @@ open class Client( @Volatile private var slowDownForException = false - @Volatile - private var isConnected = false - // is valid when there is a connection to the server, otherwise it is null + @Volatile private var connection0: CONNECTION? = null - // This is set by the client so if there is a "connect()" call in the disconnect callback, we can have proper - // lock-stop ordering for how disconnect and connect work with each-other - // GUARANTEE that the callbacks for a NEW connect happen AFTER the previous 'onDisconnect' is finished. - // a CDL is used because it doesn't matter the order in which it's called (as it will always ensure it's correct) - private val disconnectInProgress = atomic(null) - final override fun newException(message: String, cause: Throwable?): Throwable { return ClientException(message, cause) } @@ -259,23 +243,7 @@ open class Client( reliable = reliable) } - /** - * Will attempt to connect to the server via IPC, with a default 30 second connection timeout and will block until completed. - * - * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely. - * - * @throws IllegalArgumentException if the remote address is invalid - * @throws ClientTimedOutException if the client is unable to connect in x amount of time - * @throws ClientRejectedException if the client connection is rejected - */ - @Suppress("DuplicatedCode") - fun connectIpc(connectionTimeoutSec: Int = 30) = runBlocking { - connect(remoteAddress = null, // required! - remoteAddressString = IPC_NAME, - remoteAddressPrettyString = IPC_NAME, - connectionTimeoutSec = connectionTimeoutSec) - } - +// TODO:: the port should be part of the connect function! /** * Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed. * @@ -335,16 +303,19 @@ open class Client( when { // this is default IPC settings - remoteAddress.isEmpty() && config.enableIpc -> { - connectIpc(connectionTimeoutSec = connectionTimeoutSec) + remoteAddress.isEmpty() && config.enableIpc -> runBlocking { + connect(remoteAddress = null, // required! + remoteAddressString = IPC_NAME, + remoteAddressPrettyString = IPC_NAME, + connectionTimeoutSec = connectionTimeoutSec) } // IPv6 takes precedence ONLY if it's enabled manually - config.enableIPv6 -> connect(ResolvedAddressTypes.IPV6_ONLY) - config.enableIPv4 -> connect(ResolvedAddressTypes.IPV4_ONLY) - IPv4.isPreferred -> connect(ResolvedAddressTypes.IPV4_PREFERRED) - IPv6.isPreferred -> connect(ResolvedAddressTypes.IPV6_PREFERRED) - else -> connect(ResolvedAddressTypes.IPV4_PREFERRED) + config.enableIPv6 -> { connect(ResolvedAddressTypes.IPV6_ONLY) } + config.enableIPv4 -> { connect(ResolvedAddressTypes.IPV4_ONLY) } + IPv4.isPreferred -> { connect(ResolvedAddressTypes.IPV4_PREFERRED) } + IPv6.isPreferred -> { connect(ResolvedAddressTypes.IPV6_PREFERRED) } + else -> { connect(ResolvedAddressTypes.IPV4_PREFERRED) } } } @@ -384,16 +355,13 @@ open class Client( connectionTimeoutSec: Int = 30, reliable: Boolean = true) { - // on the client, we must GUARANTEE that the disconnect completes before NEW connect begins. - if (!disconnectInProgress.compareAndSet(null, dorkbox.util.sync.CountDownLatch(1))) { - val disconnectCDL = disconnectInProgress.value!! - - logger.debug { "Redispatching connect request!" } - - EventDispatcher.launch(EVENT.CONNECT) { - logger.debug { "Redispatch connect request started!" } - disconnectCDL.await(config.connectionCloseTimeoutInSeconds.toLong(), TimeUnit.SECONDS) + // NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines! + // on the client, we must GUARANTEE that the disconnect/close completes before NEW connect begins. + // we will know this if we are running inside an INTERNAL dispatch that is NOT the connect dispatcher! + val currentDispatcher = EventDispatcher.getCurrentEvent() + if (currentDispatcher != null && currentDispatcher != EventDispatcher.CONNECT) { + EventDispatcher.CONNECT.launch { connect(remoteAddress, remoteAddressString, remoteAddressPrettyString, @@ -403,16 +371,20 @@ open class Client( return } - // NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines! + // the lifecycle of a client is the ENDPOINT (measured via the network event poller) and CONNECTION (measure from connection closed) + if (!waitForClose()) { + if (endpointIsRunning.value) { + listenerManager.notifyError(ServerException("Unable to start, the client is already running!")) + } else { + listenerManager.notifyError(ClientException("Unable to connect the client!")) + } + return + } + config as ClientConfiguration require(connectionTimeoutSec >= 0) { "connectionTimeoutSec '$connectionTimeoutSec' is invalid. It must be >=0" } - if (isConnected) { - logger.error { "Unable to connect when already connected!" } - return - } - connection0 = null // localhost/loopback IP might not always be 127.0.0.1 or ::1 @@ -438,15 +410,20 @@ open class Client( require(false) { "Cannot connect to $remoteAddressPrettyString It is an invalid address!" } } + // if there are client crashes, we want to be able to still call connect() + // local scope ONLY until the connection is actually made - because if there are errors we throw this one away + val connectLatch = CountDownLatch(1) + // we are done with initial configuration, now initialize aeron and the general state of this endpoint // this also makes sure that the dispatchers are still active. // Calling `client.close()` will shutdown the dispatchers (and a new client instance must be created) try { startDriver() verifyState() - initializeLatch() + initializeState() } catch (e: Exception) { - logger.error(e) { "Unable to start the endpoint!" } + resetOnError() + listenerManager.notifyError(ClientException("Unable to start the client!", e)) return } @@ -464,21 +441,25 @@ open class Client( // how long does the initial handshake take to connect var handshakeTimeoutSec = 5 // how long before we COMPLETELY give up retrying - var timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + var connectionTimoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + + var connectionCloseTimeoutInSeconds = config.connectionCloseTimeoutInSeconds.toLong() if (DEBUG_CONNECTIONS) { // connections are extremely difficult to diagnose when the connection timeout is short - timoutInNanos += TimeUnit.HOURS.toSeconds(1).toInt() - handshakeTimeoutSec += TimeUnit.HOURS.toSeconds(1).toInt() + connectionTimoutInNanos = TimeUnit.HOURS.toNanos(1).toLong() + handshakeTimeoutSec = TimeUnit.HOURS.toSeconds(1).toInt() + connectionCloseTimeoutInSeconds = TimeUnit.HOURS.toSeconds(1).toLong() } val startTime = System.nanoTime() var success = false - while (System.nanoTime() - startTime < timoutInNanos) { + while (System.nanoTime() - startTime < connectionTimoutInNanos) { if (isShutdown()) { + resetOnError() + // If we are connecting indefinitely, we have to make sure to end the connection process - val exception = ClientShutdownException("Unable to connect while shutting down") - logger.error(exception) { "Aborting connection retry attempt to server." } + val exception = ClientShutdownException("Unable to connect while shutting down, aborting connection retry attempt to server.") listenerManager.notifyError(exception) throw exception } @@ -517,10 +498,17 @@ open class Client( logger = logger ) - logger.debug { "Connecting to ${handshakeConnection.infoPub}" } - logger.debug { "Connecting to ${handshakeConnection.infoSub}" } + // Note: the pub/sub info is from the perspective of the SERVER + val pubSub = handshakeConnection.pubSub + val logInfo = pubSub.reverseForClient().getLogInfo(logger.isDebugEnabled) - connect0(handshake, handshakeConnection, handshakeTimeoutSec) + if (logger.isDebugEnabled) { + logger.debug { "Creating new handshake to $logInfo" } + } else { + logger.info { "Creating new handshake to $logInfo" } + } + + connect0(handshake, handshakeConnection, handshakeTimeoutSec, connectionCloseTimeoutInSeconds, connectLatch) success = true slowDownForException = false @@ -541,8 +529,6 @@ open class Client( logger.info { message } } - handshake.reset() - // maybe the aeron driver isn't running? (or isn't running correctly?) aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it @@ -553,26 +539,29 @@ open class Client( slowDownForException = true if (e.cause is ServerException) { + resetOnError() val cause = e.cause!! val wrapped = ClientException(cause.message!!) listenerManager.notifyError(wrapped) throw wrapped } else { + resetOnError() listenerManager.notifyError(e) throw e } } catch (e: Exception) { - logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting." } - aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it - - listenerManager.notifyError(e) + listenerManager.notifyError(ClientException("[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting.", e)) + resetOnError() throw e } } if (!success) { - if (System.nanoTime() - startTime < timoutInNanos) { + endpointIsRunning.lazySet(false) + + if (System.nanoTime() - startTime < connectionTimoutInNanos) { + val type = if (connection0 == null) { "UNKNOWN" } else if (isIPC) { @@ -580,27 +569,35 @@ open class Client( } else { remoteAddressPrettyString + ":" + config.port } + // we timed out. Throw the appropriate exception - val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds") - logger.error(exception) { "Aborting connection attempt to server." } + val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds, aborting connection attempt to server.") listenerManager.notifyError(exception) throw exception } // If we did not connect - throw an error. When `client.connect()` is called, either it connects or throws an error - val exception = ClientRejectedException("The server did not respond or permit the connection attempt within $connectionTimeoutSec seconds") + val exception = ClientRejectedException("The server did not respond or permit the connection attempt within $connectionTimeoutSec seconds, aborting connection retry attempt to server.") exception.cleanStackTrace() - logger.error(exception) { "Aborting connection retry attempt to server." } listenerManager.notifyError(exception) throw exception } } + + // the handshake process might have to restart this connection process. - private suspend fun connect0(handshake: ClientHandshake, handshakeConnection: ClientHandshakeDriver, connectionTimeoutSec: Int) { + private suspend fun connect0( + handshake: ClientHandshake, + handshakeConnection: ClientHandshakeDriver, + connectionTimeoutSec: Int, + connectionCloseTimeoutInSeconds: Long, + connectLatch: CountDownLatch + ) { // this will block until the connection timeout, and throw an exception if we were unable to connect with the server + // throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class) val connectionInfo = handshake.hello(handshakeConnection, connectionTimeoutSec, uuid) @@ -615,7 +612,7 @@ open class Client( handshakeConnection.close() val exception = ClientRejectedException("Connection to [$remoteAddressString] not allowed! Public key mismatch.") - logger.error(exception) { "Validation error" } + listenerManager.notifyError(exception) throw exception } @@ -642,6 +639,7 @@ open class Client( ClientRejectedException("[${handshake.connectKey}] Connection to [$remoteAddressString] has incorrect class registration details!!") } exception.cleanStackTraceInternal() + listenerManager.notifyError(exception) throw exception } @@ -658,10 +656,15 @@ open class Client( // we are now connected, so we can connect to the NEW client-specific ports val clientConnection = ClientConnectionDriver.build(aeronDriver, connectionTimeoutSec, handshakeConnection, connectionInfo) - // have to rebuild the client pub/sub for the next part of the handshake (since it's a 1-shot deal for the server per session) - // if we go SLOWLY (slower than the linger timeout), it will work. if we go quickly, this it will have problems (so we must do this!) -// handshakeConnection.resetSession(logger) + // Note: the pub/sub info is from the perspective of the SERVER + val pubSub = clientConnection.connectionInfo.reverseForClient() + val logInfo = pubSub.getLogInfo(logger.isDebugEnabled) + if (logger.isDebugEnabled) { + logger.debug { "Creating new connection to $logInfo" } + } else { + logger.info { "Creating new connection to $logInfo" } + } val newConnection: CONNECTION if (handshakeConnection.pubSub.isIpc) { @@ -677,41 +680,6 @@ open class Client( storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey) } - // This is set by the client so if there is a "connect()" call in the disconnect callback, we can have proper - // lock-stop ordering for how disconnect and connect work with each-other - // GUARANTEE that the callbacks for 'onDisconnect' happens-before the 'onConnect'. - // a CDL is used because it doesn't matter the order in which it's called (as it will always ensure it's correct) - val lockStepForConnect = dorkbox.util.sync.CountDownLatch(1) - - val connectWaitTimeout = if (EventPoller.DEBUG) 99999999L else config.connectionCloseTimeoutInSeconds.toLong() - - - ////////////// - /// Extra Close action - ////////////// - newConnection.closeAction = { - // this is called whenever connection.close() is called by the framework or via client.close() - isConnected = false - - // make sure to call our client.notifyConnect() callbacks execute - - // force us to wait until AFTER the connect logic has run. we MUST use a CDL. A mutex doesn't work properly - lockStepForConnect.await(connectWaitTimeout, TimeUnit.SECONDS) - - EventDispatcher.launch(EVENT.DISCONNECT) { - listenerManager.notifyDisconnect(connection) - - // we must reset the disconnect-in-progress latch AND count down, so that reconnects can successfully reconnect - val disconnectCDL = disconnectInProgress.getAndSet(null)!! - disconnectCDL.countDown() - } - } - - // before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls - EventDispatcher.launch(EVENT.INIT) { - listenerManager.notifyInit(newConnection) - } - connection0 = newConnection addConnection(newConnection) @@ -721,54 +689,57 @@ open class Client( try { handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details) } catch (e: Exception) { - logger.error(e) { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] error during handshake" } + listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] error during handshake", e)) throw e } // finished with the handshake, so always close these! handshakeConnection.close() - isConnected = true - logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] done with handshake." } + // before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls + listenerManager.notifyInit(newConnection) + // have to make a new thread to listen for incoming data! // SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them - - // additionally, if we have MULTIPLE clients on the same machine, we are limited by the CPU core count. Ideally we want to share this among ALL clients within the same JVM so that we can support multiple clients/servers - networkEventPoller.submit { - if (!isShutdown()) { - if (!newConnection.isClosedViaAeron()) { - // Polls the AERON media driver subscription channel for incoming messages - newConnection.poll() - } else { - // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. - logger.debug { "[${handshakeConnection.details}] connection expired (cleanup)" } - - // When we close via a message (or when the connection timeout has expired), we do not flush the state! - // NOTE: the state is ONLY flushed when client.close() is called! - EventDispatcher.launch(EVENT.CLOSE) { - newConnection.close(enableRemove = true) - } - - // remove ourselves from processing - EventPoller.REMOVE - } + networkEventPoller.submit( + action = { + // if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed + // we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip + if (!(shutdownEventPoller || newConnection.isClosedViaAeron())) { + newConnection.poll() } else { + // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. + logger.debug { "[${connection}] connection expired (cleanup)" } + + // the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected) + removeConnection(newConnection) + + // we already removed the connection, we can call it again without side affects + newConnection.close() + // remove ourselves from processing EventPoller.REMOVE } - } + }, + onShutdown = { + // this can be closed when the connection is remotely closed in ADDITION to manually closing + logger.debug { "Client event dispatch closing..." } - // if something inside-of listenerManager.notifyConnect is blocking or suspends, then polling will never happen! - // This must be on a different thread - EventDispatcher.launch(EVENT.CONNECT) { - // what happens if the disconnect runs INSIDE the connect? - listenerManager.notifyConnect(newConnection) + // we only need to run shutdown methods if there was a network outage or D/C + if (!shutdownInProgress.value) { + this@Client.closeSuspending(true) + } - // now the disconnect logic can run because we are done with the connect logic. - lockStepForConnect.countDown() - } + // we can now call connect again + endpointIsRunning.lazySet(false) + pollerClosedLatch.countDown() + + logger.debug { "Closed the Network Event Poller..." } + }) + + listenerManager.notifyConnect(newConnection) } /** @@ -790,7 +761,7 @@ open class Client( get() = connection.isNetwork /** - * @return the connection (TCP or IPC) id of this connection. + * @return the connection id of this connection. */ val id: Int get() = connection.id @@ -813,27 +784,8 @@ open class Client( return if (c != null) { c.send(message) } else { - val exception = ClientException("Cannot send a message when there is no connection!") - logger.error(exception) { "No connection!" } - false - } - } - - /** - * Sends a message to the server, if the connection is closed for any reason, this returns false. - * - * @return true if the message was sent successfully, false if the connection has been closed - */ - fun sendBlocking(message: Any): Boolean { - val c = connection0 - - return if (c != null) { - runBlocking { - c.send(message) - } - } else { - val exception = ClientException("Cannot send a message when there is no connection!") - logger.error(exception) { "No connection!" } + val exception = TransmitException("Cannot send a message when there is no connection!") + listenerManager.notifyError(exception) false } } @@ -851,23 +803,13 @@ open class Client( if (c != null) { return super.ping(c, pingTimeoutSeconds, function) } else { - logger.error(ClientException("Cannot send a ping when there is no connection!")) { "No connection!" } + val exception = TransmitException("Cannot send a ping when there is no connection!") + listenerManager.notifyError(exception) } return false } - /** - * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. - * - * @param function called when the ping returns (ie: update time/latency counters/metrics/etc) - */ - fun pingBlocking(pingTimeoutSeconds: Int = config.pingTimeoutSeconds, function: suspend Ping.() -> Unit): Boolean { - return runBlocking { - ping(pingTimeoutSeconds, function) - } - } - /** * Removes the specified host address from the list of registered server keys. */ @@ -879,8 +821,24 @@ open class Client( } } - final override suspend fun close0() { - // no impl + /** + * Will throw an exception if there are resources that are still in use + */ + fun checkForMemoryLeaks() { + AeronDriver.checkForMemoryLeaks() + } + + /** + * If you call close() on the client endpoint, it will shut down all parts of the endpoint (listeners, driver, event polling, etc). + */ + fun close() { + runBlocking { + closeSuspending() + } + } + + override fun toString(): String { + return "EndPoint [Client: $uuid]" } fun use(block: (Client) -> R): R { @@ -888,10 +846,6 @@ open class Client( block(this) } finally { close() - runBlocking { - waitForClose() - logger.error { "finished close event" } - } } } } diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 2b6a0407..572da8c1 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -21,18 +21,14 @@ import dorkbox.network.aeron.EventPoller import dorkbox.network.connection.Connection import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.EndPoint -import dorkbox.network.connection.EventDispatcher -import dorkbox.network.connection.EventDispatcher.Companion.EVENT import dorkbox.network.connection.IpInfo import dorkbox.network.connection.IpInfo.Companion.IpListenType import dorkbox.network.connectionType.ConnectionRule -import dorkbox.network.exceptions.AllocationException import dorkbox.network.exceptions.ServerException import dorkbox.network.handshake.ServerHandshake import dorkbox.network.handshake.ServerHandshakePollers import dorkbox.network.ipFilter.IpFilterRule import dorkbox.network.rmi.RmiSupportServer -import kotlinx.atomicfu.atomic import kotlinx.coroutines.runBlocking import mu.KotlinLogging import java.util.concurrent.* @@ -137,9 +133,7 @@ open class Server( val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2) val logger = KotlinLogging.logger(Server::class.java.simpleName) - AeronDriver(configuration, logger).use { - it.ensureStopped(timeout, 500) - } + AeronDriver.ensureStopped(configuration, logger, timeout) } /** @@ -151,9 +145,7 @@ open class Server( */ fun isRunning(configuration: ServerConfiguration): Boolean = runBlocking { val logger = KotlinLogging.logger(Server::class.java.simpleName) - AeronDriver(configuration, logger).use { - it.isRunning() - } + AeronDriver.isRunning(configuration, logger) } init { @@ -167,20 +159,6 @@ open class Server( */ val rmiGlobal = RmiSupportServer(logger, rmiGlobalSupport) - /** - * @return true if this server has successfully bound to an IP address and is running - */ - private var bindAlreadyCalled = atomic(false) - - /** - * These are run in lock-step to shutdown/close the server. Afterwards, bind() can be called again - */ - @Volatile - private var shutdownPollLatch = dorkbox.util.sync.CountDownLatch(0 ) - - @Volatile - private var shutdownEventLatch = dorkbox.util.sync.CountDownLatch(0) - /** * Maintains a thread-safe collection of rules used to define the connection type with this server. */ @@ -191,6 +169,9 @@ open class Server( */ internal val ipInfo = IpInfo(config) + @Volatile + internal lateinit var handshake: ServerHandshake + final override fun newException(message: String, cause: Throwable?): Throwable { return ServerException(message, cause) } @@ -220,23 +201,20 @@ open class Server( try { startDriver() verifyState() - initializeLatch() + initializeState() } catch (e: Exception) { resetOnError() listenerManager.notifyError(ServerException("Unable to start the server!", e)) return@runBlocking } - shutdownPollLatch = dorkbox.util.sync.CountDownLatch(1) - shutdownEventLatch = dorkbox.util.sync.CountDownLatch(1) - config as ServerConfiguration // we are done with initial configuration, now initialize aeron and the general state of this endpoint val server = this@Server - val handshake = ServerHandshake(logger, config, listenerManager, aeronDriver) + handshake = ServerHandshake(config, listenerManager, aeronDriver) val ipcPoller: AeronPoller = if (config.enableIpc) { ServerHandshakePollers.ipc(server, handshake) @@ -251,26 +229,20 @@ open class Server( IpListenType.IPv6Wildcard -> ServerHandshakePollers.ip6(server, handshake) IpListenType.IPv4 -> ServerHandshakePollers.ip4(server, handshake) IpListenType.IPv6 -> ServerHandshakePollers.ip6(server, handshake) - IpListenType.IPC -> ServerHandshakePollers.disabled("IP Disabled") + IpListenType.IPC -> ServerHandshakePollers.disabled("IPv4/6 Disabled") } logger.info { ipcPoller.info } logger.info { ipPoller.info } - // additionally, if we have MULTIPLE clients on the same machine, we are limited by the CPU core count. Ideally we want to share this among ALL clients within the same JVM so that we can support multiple clients/servers networkEventPoller.submit( - action = { - if (!isShutdown()) { - var pollCount = 0 - + action = { + if (!shutdownEventPoller) { // NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment. // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` - // this checks to see if there are NEW clients on the handshake ports - pollCount += ipPoller.poll() - - // this checks to see if there are NEW clients via IPC - pollCount += ipcPoller.poll() + // this checks to see if there are NEW clients to handshake with + var pollCount = ipcPoller.poll() + ipPoller.poll() // this manages existing clients (for cleanup + connection polling). This has a concurrent iterator, // so we can modify this as we go @@ -280,25 +252,13 @@ open class Server( pollCount += connection.poll() } else { // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. - logger.debug { "[${connection.details}] connection expired (cleanup)" } + logger.debug { "[${connection}] connection expired (cleanup)" } - // the connection MUST be removed in the same thread that is processing events + // the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected) removeConnection(connection) - // this will call removeConnection again, but that is ok - EventDispatcher.launch(EVENT.CLOSE) { - // we already removed the connection - connection.close(enableRemove = false) - - // have to manually notify the server-listenerManager that this connection was closed - // if the connection was MANUALLY closed (via calling connection.close()), then the connection-listener-manager is - // instantly notified and on cleanup, the server-listener-manager is called - - // this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback - EventDispatcher.launch(EVENT.DISCONNECT) { - listenerManager.notifyDisconnect(connection) - } - } + // we already removed the connection, we can call it again without side affects + connection.close() } } @@ -311,63 +271,21 @@ open class Server( onShutdown = { logger.debug { "Server event dispatch closing..." } - // we want to process **actual** close cleanup events on this thread as well, otherwise we will have threading problems - shutdownPollLatch.await() + ipcPoller.close() + ipPoller.close() - // we want to clear all the connections FIRST (since we are shutting down) - val cons = mutableListOf() - connections.forEach { cons.add(it) } - connections.clear() + // clear all the handshake info + handshake.clear() + // we can now call bind again + endpointIsRunning.lazySet(false) + pollerClosedLatch.countDown() - // when we close a client or a server, we want to make sure that ALL notifications are finished. - // when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown - - // we have to manually clean-up the connections and call server-notifyDisconnect because otherwise this will never get called - try { - cons.forEach { connection -> - logger.info { "[${connection.details}] Connection cleanup and close" } - // make sure the connection is closed (close can only happen once, so a duplicate call does nothing!) - - EventDispatcher.launch(EVENT.CLOSE) { - connection.close(enableRemove = true) - - // have to manually notify the server-listenerManager that this connection was closed - // if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is - // instantly notified and on cleanup, the server-listenermanager is called - // NOTE: this must be the LAST thing happening! - - // the SERVER cannot re-connect to clients, only clients can call 'connect'. - EventDispatcher.launch(EVENT.DISCONNECT) { - listenerManager.notifyDisconnect(connection) - } - } - } - } finally { - ipcPoller.close() - ipPoller.close() - - // clear all the handshake info - handshake.clear() - - try { - AeronDriver.checkForMemoryLeaks() - - // make sure that we have de-allocated all connection data - handshake.checkForMemoryLeaks() - } catch (e: AllocationException) { - logger.error(e) { "Error during server cleanup" } - } - - // finish closing -- this lets us make sure that we don't run into race conditions on the thread that calls close() - try { - shutdownEventLatch.countDown() - } catch (ignored: Exception) { - } - } + logger.debug { "Closed the Network Event Poller..." } }) } + /** * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. * - NOTHING : Nothing happens to the in/out bytes @@ -435,32 +353,29 @@ open class Server( } } - fun close() { - close(false) {} - } + /** + * Will throw an exception if there are resources that are still in use + */ + fun checkForMemoryLeaks() { + AeronDriver.checkForMemoryLeaks() - fun close(onCloseFunction: () -> Unit = {}) { - close(false, onCloseFunction) - } - - final override fun close(shutdownEndpoint: Boolean, onCloseFunction: () -> Unit) { - super.close(shutdownEndpoint, onCloseFunction) + // make sure that we have de-allocated all connection data + handshake.checkForMemoryLeaks() } /** - * Closes the server and all it's connections. After a close, you may call 'bind' again. + * If you call close() on the server endpoint, it will shut down all parts of the endpoint (listeners, driver, event polling, etc). */ - final override suspend fun close0() { - // when we call close, it will shutdown the polling mechanism, then wait for us to tell it to clean-up connections. - // - // Aeron + the Media Driver will have already been shutdown at this point. - if (bindAlreadyCalled.getAndSet(false)) { - // These are run in lock-step - shutdownPollLatch.countDown() - shutdownEventLatch.await() + fun close() { + runBlocking { + closeSuspending() } } + override fun toString(): String { + return "EndPoint [Server]" + } + /** * Enable */ @@ -469,11 +384,6 @@ open class Server( block(this) } finally { close() - - runBlocking { - waitForClose() - logger.error { "finished close event" } - } } }