diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 43fd99e5..013b6a7b 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -29,7 +29,10 @@ import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingData import dorkbox.network.connection.streaming.StreamingManager -import dorkbox.network.exceptions.AllocationException +import dorkbox.network.exceptions.MessageDispatchException +import dorkbox.network.exceptions.PingException +import dorkbox.network.exceptions.RMIException +import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.StreamingException import dorkbox.network.ping.Ping import dorkbox.network.ping.PingManager @@ -42,6 +45,7 @@ import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.SettingsStore import io.aeron.Publication import io.aeron.logbuffer.Header +import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob @@ -53,7 +57,6 @@ import mu.KotlinLogging import org.agrona.DirectBuffer import org.agrona.MutableDirectBuffer import org.agrona.concurrent.IdleStrategy -import java.util.* import java.util.concurrent.* // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! @@ -123,14 +126,14 @@ abstract class EndPoint private constructor(val type: C } } - - // TODO: add UUID to handshake so that connections from BOTH endpoints have the same uuid (so we know if there is a reconnect) + /** + * The UUID is a unique, in-memory instance that is created on object construction + */ + val uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate() // the ID would be different?? but the UUID would be the same?? val logger: KLogger = KotlinLogging.logger(loggerName) - val uuid : UUID - // this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it. // ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY. private val errorHandler = CoroutineExceptionHandler { _, exception -> @@ -164,9 +167,26 @@ abstract class EndPoint private constructor(val type: C */ internal val crypto: CryptoManagement - // this barrier only prevents multiple shutdowns (in the event this close() is called multiple timees) + // manage the startup state of the endpoint. True if the endpoint is running + internal val endpointIsRunning = atomic(false) + + // this only prevents multiple shutdowns (in the event this close() is called multiple times) @Volatile - private var shutdownLatch: dorkbox.util.sync.CountDownLatch + private var shutdown = false + internal val shutdownInProgress = atomic(false) + + @Volatile + internal var shutdownEventPoller = false + + @Volatile + private var shutdownLatch = dorkbox.util.sync.CountDownLatch(0) + + /** + * This is run in lock-step to shutdown/close the client/server event poller. Afterwards, connect/bind can be called again + */ + @Volatile + internal var pollerClosedLatch = dorkbox.util.sync.CountDownLatch(0) + /** * Returns the storage used by this endpoint. This is the backing data structure for key/value pairs, and can be a database, file, etc @@ -182,7 +202,6 @@ abstract class EndPoint private constructor(val type: C private val pingManager = PingManager() - init { if (DEBUG_CONNECTIONS) { logger.error { "DEBUG_CONNECTIONS is enabled. This should not happen in release!" } @@ -190,10 +209,6 @@ abstract class EndPoint private constructor(val type: C config.validate() // this happens more than once! (this is ok) - // there are threading issues if there are client(s) and server's within the same JVM, where we have thread starvation - networkEventPoller.configure(logger, config) - - // serialization stuff @Suppress("UNCHECKED_CAST") serialization = config.serialization as Serialization @@ -216,14 +231,33 @@ abstract class EndPoint private constructor(val type: C // we have to be able to specify the property store storage = SettingsStore(config.settingsStore, logger) crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation) - uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate() // Only starts the media driver if we are NOT already running! + // NOTE: in the event that we are IPC -- only ONE SERVER can be running IPC at a time for a single driver! + if (type == Server::class.java && config.enableIpc) { + runBlocking { + val configuration = config.copy() + if (AeronDriver.isLoaded(configuration, logger)) { + val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}") + logger.error("Error initializing server", e) + throw e + } + + + if (AeronDriver.isRunning(configuration, logger)) { + val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}") + logger.error("Error initializing server", e) + throw e + } + } + } + + try { @Suppress("LeakingThis") aeronDriver = AeronDriver(this) } catch (e: Exception) { - logger.error("Error initializing endpoint", e) + logger.error("Error initializing server", e) throw e } @@ -243,6 +277,18 @@ abstract class EndPoint private constructor(val type: C } } + internal fun isServer(function: Server.() -> Unit) { + if (type == Server::class.java) { + function(this as Server) + } + } + + internal fun isClient(function: Client.() -> Unit) { + if (type == Client::class.java) { + function(this as Client) + } + } + /** * Make sure that the different dispatchers are currently active. * @@ -257,14 +303,17 @@ abstract class EndPoint private constructor(val type: C * * The client calls this every time it attempts a connection. */ - internal fun initializeLatch() { - if (shutdownLatch.count == 0) { - shutdownLatch.countUp() - } + internal fun initializeState() { + shutdownLatch = dorkbox.util.sync.CountDownLatch(1) + pollerClosedLatch = dorkbox.util.sync.CountDownLatch(1) + endpointIsRunning.lazySet(true) + shutdown = false + shutdownEventPoller = false + aeronDriver.resetUdpSessionId() - if (canCloseLatch.count == 0) { - canCloseLatch.countUp() - } + // there are threading issues if there are client(s) and server's within the same JVM, where we have thread starvation + // this resolves the problem. Additionally, this is tied-to specific a specific endpoint instance + networkEventPoller.configure(logger, config, this) } /** @@ -292,8 +341,8 @@ abstract class EndPoint private constructor(val type: C * Stops the network driver. * * @param forceTerminate if true, then there is no caution when restarting the Aeron driver, and any other process on the machine using - * the same driver will probably crash (unless they have been appropriately stopped). If false (the default), then the Aeron driver is - * only stopped if it is safe to do so + * the same driver will probably crash (unless they have been appropriately stopped). + * If false, then the Aeron driver is only stopped if it is safe to do so */ suspend fun stopDriver(forceTerminate: Boolean = false) { if (forceTerminate) { @@ -344,6 +393,8 @@ abstract class EndPoint private constructor(val type: C * * NOTE: This callback is executed IN-LINE with network IO, so one must be very careful about what is executed. * + * Things that happen in this event are TIME-CRITICAL, and must happen before anything else. If you block here, you will block network IO + * * For a server, this function will be called for ALL client connections. */ fun onInit(function: suspend CONNECTION.() -> Unit){ @@ -502,20 +553,20 @@ abstract class EndPoint private constructor(val type: C // the remote endPoint will send this message if it is closing the connection. // IF we get this message in time, then we do not have to wait for the connection to expire before closing it is DisconnectMessage -> { - // NOTE: This MUST be on a new co-routine - EventDispatcher.CLOSE.launch { - connection.close(enableRemove = true) + // NOTE: This MUST be on a new co-routine (this is...) + runBlocking { + connection.close() } } is Ping -> { // PING will also measure APP latency, not just NETWORK PIPE latency - // NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RMI!) + // NOTE: This MUST be on a new co-routine, specifically the messageDispatch messageDispatch.launch { try { pingManager.manage(connection, responseManager, message, logger) } catch (e: Exception) { - listenerManager.notifyError(connection, e) + listenerManager.notifyError(connection, PingException("Error while processing Ping message", e)) } } } @@ -526,12 +577,12 @@ abstract class EndPoint private constructor(val type: C is RmiMessage -> { // if we are an RMI message/registration, we have very specific, defined behavior. // We do not use the "normal" listener callback pattern because this requires special functionality - // NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RMI!) + // NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RESPONSE_MANAGER!) messageDispatch.launch { try { rmiGlobalSupport.processMessage(serialization, connection, message, rmiConnectionSupport, responseManager, logger) } catch (e: Exception) { - listenerManager.notifyError(connection, e) + listenerManager.notifyError(connection, RMIException("Error while processing RMI message", e)) } } } @@ -552,8 +603,7 @@ abstract class EndPoint private constructor(val type: C try { streamingManager.processDataMessage(message, this@EndPoint, connection) } catch (e: Exception) { - val newException = StreamingException("Error processing StreamingMessage", e) - listenerManager.notifyError(connection, newException) + listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e)) } } @@ -568,17 +618,16 @@ abstract class EndPoint private constructor(val type: C hasListeners = hasListeners or connection.notifyOnMessage(message) if (!hasListeners) { - logger.error("No message callbacks found for ${message::class.java.name}") + listenerManager.notifyError(connection, MessageDispatchException("No message callbacks found for ${message::class.java.name}")) } } catch (e: Exception) { - logger.error("Error processing message ${message::class.java.name}", e) - listenerManager.notifyError(connection, e) + listenerManager.notifyError(connection, MessageDispatchException("Error processing message ${message::class.java.name}", e)) } } } else -> { - logger.error("Unknown message received!!") + listenerManager.notifyError(connection, MessageDispatchException("Unknown message received!!")) } } } @@ -601,7 +650,7 @@ abstract class EndPoint private constructor(val type: C try { // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! - val message = readKryo.read(buffer, offset, length, connection) + val message = readKryo.read(buffer, offset, length, connection) logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" } processMessage(message, connection) } catch (e: Exception) { @@ -811,25 +860,35 @@ abstract class EndPoint private constructor(val type: C return shutdown } + /** * Waits for this endpoint to be closed */ - suspend fun waitForClose() { - if (!shutdown) { - // we're not shutdown, so don't even bother waiting - return - } + suspend fun waitForClose(): Boolean { + return waitForClose(0L) + } - while (latch !== shutdownLatch) { - latch = shutdownLatch - // if we are restarting the network state, we want to continue to wait for a proper close event. Because we RESET the latch, - // we must continue to check + /** + * Waits for this endpoint to be closed. + * + * @return true if the wait completed before the timeout + */ + suspend fun waitForClose(timeoutMS: Long = 0L): Boolean { + logger.error { "WAITING FOR CLOSE $timeoutMS endpoint" } + // if we are restarting the network state, we want to continue to wait for a proper close event. + // when shutting down, it can take up to 5 seconds to fully register as "shutdown" - logger.error { "waiting for close event to finish" } - latch.await() + return if (timeoutMS > 0) { + pollerClosedLatch.await(timeoutMS, TimeUnit.MILLISECONDS) && shutdownLatch.await(timeoutMS, TimeUnit.MILLISECONDS) + } else { + pollerClosedLatch.await() + logger.error { "waiting for shutdown" } + shutdownLatch.await() + true } } + /** * Shall we preserve state when we shutdown, or do we remove all onConnect/Disconnect/etc events from memory. * @@ -837,98 +896,97 @@ abstract class EndPoint private constructor(val type: C * 1) We should reset 100% of the state+events, so that every time we connect, everything is redone * 2) We preserve the state+event, BECAUSE adding the onConnect/Disconnect/message event states might be VERY expensive. * - * If we call "close" multiple times, we want to run the requested logic + onCloseFunction EACH TIME IT'S CALLED! + * NOTE: This method does NOT block, as the connection state is asynchronous. Use "waitForClose()" to wait for this to finish + * + * @param clientConnectionDC this is only true when a connection is closed in the client. */ - suspend fun shutdown(shutdownEndpoint: Boolean = false, onCloseFunction: () -> Unit) { - // we must set the shutdown state immediately - shutdown = true + internal suspend fun closeSuspending(clientConnectionDC: Boolean = false) { + // 1) endpoints can call close() + // 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually) + val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true) + if (shutdownPreviouslyStarted && clientConnectionDC) { + // this is only called when the client network event poller shuts down + // if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely) - EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { - logger.info { "Shutting down..." } + // Clears out all registered events + listenerManager.close() - // if we were ALREADY shutdown, just run the appropriate logic again (but not ALL the logic) + // Remove from memory the data from the back-end storage + storage.close() + aeronDriver.close() + // don't do anything more, since we've already shutdown! + return + } - if (!shutdownEndpoint) { - closeAction() - } else { - closeAction { - // Connections MUST be closed first, because we want to make sure that no RMI messages can be received - // when we close the RMI support objects (in which case, weird - but harmless - errors show up) - // this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method) - responseManager.close() + logger.error { "Requesting endpoint shutdown for ${type.simpleName} shutdownPreviouslyStarted=$shutdownPreviouslyStarted" } + EventDispatcher.CLOSE.launch { + logger.debug { "Shutting down endpoint..." } + + // always do this. It is OK to run this multiple times + // the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections + // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() +logger.error("CLOSING CONNECTIONS FROM CLOSE EVENT: ${connections.size()}") + connections.forEach { + it.closeImmediately() + } + + // don't do these things if we are "closed" from a client connection disconnect + if (!clientConnectionDC && !shutdownPreviouslyStarted) { + // THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER! + shutdownEventPoller = true + // if we close the poller AND listener manager too quickly, events will not get published + pollerClosedLatch.await() + } + + // this will ONLY close the event dispatcher if ALL endpoints have closed it. + // when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed. + networkEventPoller.close(logger, this) + + // Connections MUST be closed first, because we want to make sure that no RMI messages can be received + // when we close the RMI support objects (in which case, weird - but harmless - errors show up) + // this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method) + responseManager.close() + + // don't do these things if we are "closed" from a client connection disconnect + if (!clientConnectionDC) { + // if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done + EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { // Clears out all registered events listenerManager.close() // Remove from memory the data from the back-end storage storage.close() - // only on the server, this shuts down the poll/event dispatchers - // This cannot be called from the event dispatcher! - close0() + aeronDriver.close() - if (this is Server<*>) { - try { - // make sure that we have de-allocated all connection data - AeronDriver.checkForMemoryLeaks() - handshake.checkForMemoryLeaks() - } catch (e: AllocationException) { - logger.error(e) { "Error during server cleanup" } - } - } + // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on + shutdown = true + shutdownLatch.countDown() + shutdownInProgress.lazySet(false) + logger.info { "Done shutting down endpoint." } } + } else { + // when the client connection is closed, we don't close the driver/etc. + shutdown = true + shutdownLatch.countDown() + shutdownInProgress.lazySet(false) + logger.info { "Done shutting down endpoint." } } - - onCloseFunction() - - logger.info { "Done shutting down..." } - - // if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now) - shutdownLatch.countDown() - canCloseLatch.countDown() } - - - // when the endpoint is closed, we must wait until after ALL CLOSE events are called! - logger.info { "Waiting for close listener to finish, then shutting down..." + EventDispatcher.getExecutingDispatchEvent() } - canCloseLatch.await() } - private suspend fun closeAction(extraActions: suspend () -> Unit = {}) { - // the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections - // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() - val enableRemove = type == Client::class.java - connections.forEach { - logger.info { "[${it}] Closing connection" } - it.close(enableRemove) - } - - logger.error { "CLOSE ACTION 2" } - // must run after connections have been closed, but before anything else happens - extraActions() - logger.error { "CLOSE ACTION 3" } - close0() - logger.error { "CLOSE ACTION 4" } - // this will ONLY close the event dispatcher if ALL endpoints have closed it. - // when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed. - networkEventPoller.close(logger) - - aeronDriver.close() - - messageDispatch.cancel("${type.simpleName} shutting down") - - // if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now) + /** + * Reset the running state when there's an error starting up + */ + internal fun resetOnError() { shutdownLatch.countDown() - } - - - internal open suspend fun close0() {} - - - override fun toString(): String { - return "EndPoint [${type.simpleName}] $uuid" + pollerClosedLatch.countDown() + endpointIsRunning.lazySet(false) + shutdown = false + shutdownEventPoller = false } override fun hashCode(): Int {