Code polish/cleanup

This commit is contained in:
Robinson 2023-06-25 12:05:41 +02:00
parent c9e71c56b1
commit 8e30883ebc
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -29,7 +29,10 @@ import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingControl
import dorkbox.network.connection.streaming.StreamingData import dorkbox.network.connection.streaming.StreamingData
import dorkbox.network.connection.streaming.StreamingManager import dorkbox.network.connection.streaming.StreamingManager
import dorkbox.network.exceptions.AllocationException import dorkbox.network.exceptions.MessageDispatchException
import dorkbox.network.exceptions.PingException
import dorkbox.network.exceptions.RMIException
import dorkbox.network.exceptions.ServerException
import dorkbox.network.exceptions.StreamingException import dorkbox.network.exceptions.StreamingException
import dorkbox.network.ping.Ping import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager import dorkbox.network.ping.PingManager
@ -42,6 +45,7 @@ import dorkbox.network.serialization.Serialization
import dorkbox.network.serialization.SettingsStore import dorkbox.network.serialization.SettingsStore
import io.aeron.Publication import io.aeron.Publication
import io.aeron.logbuffer.Header import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
@ -53,7 +57,6 @@ import mu.KotlinLogging
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy import org.agrona.concurrent.IdleStrategy
import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
@ -123,14 +126,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
} }
} }
/**
// TODO: add UUID to handshake so that connections from BOTH endpoints have the same uuid (so we know if there is a reconnect) * The UUID is a unique, in-memory instance that is created on object construction
*/
val uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate()
// the ID would be different?? but the UUID would be the same?? // the ID would be different?? but the UUID would be the same??
val logger: KLogger = KotlinLogging.logger(loggerName) val logger: KLogger = KotlinLogging.logger(loggerName)
val uuid : UUID
// this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it. // this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it.
// ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY. // ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY.
private val errorHandler = CoroutineExceptionHandler { _, exception -> private val errorHandler = CoroutineExceptionHandler { _, exception ->
@ -164,9 +167,26 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/ */
internal val crypto: CryptoManagement internal val crypto: CryptoManagement
// this barrier only prevents multiple shutdowns (in the event this close() is called multiple timees) // manage the startup state of the endpoint. True if the endpoint is running
internal val endpointIsRunning = atomic(false)
// this only prevents multiple shutdowns (in the event this close() is called multiple times)
@Volatile @Volatile
private var shutdownLatch: dorkbox.util.sync.CountDownLatch private var shutdown = false
internal val shutdownInProgress = atomic(false)
@Volatile
internal var shutdownEventPoller = false
@Volatile
private var shutdownLatch = dorkbox.util.sync.CountDownLatch(0)
/**
* This is run in lock-step to shutdown/close the client/server event poller. Afterwards, connect/bind can be called again
*/
@Volatile
internal var pollerClosedLatch = dorkbox.util.sync.CountDownLatch(0)
/** /**
* Returns the storage used by this endpoint. This is the backing data structure for key/value pairs, and can be a database, file, etc * Returns the storage used by this endpoint. This is the backing data structure for key/value pairs, and can be a database, file, etc
@ -182,7 +202,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
private val pingManager = PingManager<CONNECTION>() private val pingManager = PingManager<CONNECTION>()
init { init {
if (DEBUG_CONNECTIONS) { if (DEBUG_CONNECTIONS) {
logger.error { "DEBUG_CONNECTIONS is enabled. This should not happen in release!" } logger.error { "DEBUG_CONNECTIONS is enabled. This should not happen in release!" }
@ -190,10 +209,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
config.validate() // this happens more than once! (this is ok) config.validate() // this happens more than once! (this is ok)
// there are threading issues if there are client(s) and server's within the same JVM, where we have thread starvation
networkEventPoller.configure(logger, config)
// serialization stuff // serialization stuff
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
serialization = config.serialization as Serialization<CONNECTION> serialization = config.serialization as Serialization<CONNECTION>
@ -216,14 +231,33 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// we have to be able to specify the property store // we have to be able to specify the property store
storage = SettingsStore(config.settingsStore, logger) storage = SettingsStore(config.settingsStore, logger)
crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation) crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation)
uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate()
// Only starts the media driver if we are NOT already running! // Only starts the media driver if we are NOT already running!
// NOTE: in the event that we are IPC -- only ONE SERVER can be running IPC at a time for a single driver!
if (type == Server::class.java && config.enableIpc) {
runBlocking {
val configuration = config.copy()
if (AeronDriver.isLoaded(configuration, logger)) {
val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}")
logger.error("Error initializing server", e)
throw e
}
if (AeronDriver.isRunning(configuration, logger)) {
val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}")
logger.error("Error initializing server", e)
throw e
}
}
}
try { try {
@Suppress("LeakingThis") @Suppress("LeakingThis")
aeronDriver = AeronDriver(this) aeronDriver = AeronDriver(this)
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Error initializing endpoint", e) logger.error("Error initializing server", e)
throw e throw e
} }
@ -243,6 +277,18 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
} }
} }
internal fun isServer(function: Server<CONNECTION>.() -> Unit) {
if (type == Server::class.java) {
function(this as Server<CONNECTION>)
}
}
internal fun isClient(function: Client<CONNECTION>.() -> Unit) {
if (type == Client::class.java) {
function(this as Client<CONNECTION>)
}
}
/** /**
* Make sure that the different dispatchers are currently active. * Make sure that the different dispatchers are currently active.
* *
@ -257,14 +303,17 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* The client calls this every time it attempts a connection. * The client calls this every time it attempts a connection.
*/ */
internal fun initializeLatch() { internal fun initializeState() {
if (shutdownLatch.count == 0) { shutdownLatch = dorkbox.util.sync.CountDownLatch(1)
shutdownLatch.countUp() pollerClosedLatch = dorkbox.util.sync.CountDownLatch(1)
} endpointIsRunning.lazySet(true)
shutdown = false
shutdownEventPoller = false
aeronDriver.resetUdpSessionId()
if (canCloseLatch.count == 0) { // there are threading issues if there are client(s) and server's within the same JVM, where we have thread starvation
canCloseLatch.countUp() // this resolves the problem. Additionally, this is tied-to specific a specific endpoint instance
} networkEventPoller.configure(logger, config, this)
} }
/** /**
@ -292,8 +341,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* Stops the network driver. * Stops the network driver.
* *
* @param forceTerminate if true, then there is no caution when restarting the Aeron driver, and any other process on the machine using * @param forceTerminate if true, then there is no caution when restarting the Aeron driver, and any other process on the machine using
* the same driver will probably crash (unless they have been appropriately stopped). If false (the default), then the Aeron driver is * the same driver will probably crash (unless they have been appropriately stopped).
* only stopped if it is safe to do so * If false, then the Aeron driver is only stopped if it is safe to do so
*/ */
suspend fun stopDriver(forceTerminate: Boolean = false) { suspend fun stopDriver(forceTerminate: Boolean = false) {
if (forceTerminate) { if (forceTerminate) {
@ -344,6 +393,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* NOTE: This callback is executed IN-LINE with network IO, so one must be very careful about what is executed. * NOTE: This callback is executed IN-LINE with network IO, so one must be very careful about what is executed.
* *
* Things that happen in this event are TIME-CRITICAL, and must happen before anything else. If you block here, you will block network IO
*
* For a server, this function will be called for ALL client connections. * For a server, this function will be called for ALL client connections.
*/ */
fun onInit(function: suspend CONNECTION.() -> Unit){ fun onInit(function: suspend CONNECTION.() -> Unit){
@ -502,20 +553,20 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// the remote endPoint will send this message if it is closing the connection. // the remote endPoint will send this message if it is closing the connection.
// IF we get this message in time, then we do not have to wait for the connection to expire before closing it // IF we get this message in time, then we do not have to wait for the connection to expire before closing it
is DisconnectMessage -> { is DisconnectMessage -> {
// NOTE: This MUST be on a new co-routine // NOTE: This MUST be on a new co-routine (this is...)
EventDispatcher.CLOSE.launch { runBlocking {
connection.close(enableRemove = true) connection.close()
} }
} }
is Ping -> { is Ping -> {
// PING will also measure APP latency, not just NETWORK PIPE latency // PING will also measure APP latency, not just NETWORK PIPE latency
// NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RMI!) // NOTE: This MUST be on a new co-routine, specifically the messageDispatch
messageDispatch.launch { messageDispatch.launch {
try { try {
pingManager.manage(connection, responseManager, message, logger) pingManager.manage(connection, responseManager, message, logger)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(connection, e) listenerManager.notifyError(connection, PingException("Error while processing Ping message", e))
} }
} }
} }
@ -526,12 +577,12 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
is RmiMessage -> { is RmiMessage -> {
// if we are an RMI message/registration, we have very specific, defined behavior. // if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this requires special functionality // We do not use the "normal" listener callback pattern because this requires special functionality
// NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RMI!) // NOTE: This MUST be on a new co-routine, specifically the messageDispatch (it IS NOT the EventDispatch.RESPONSE_MANAGER!)
messageDispatch.launch { messageDispatch.launch {
try { try {
rmiGlobalSupport.processMessage(serialization, connection, message, rmiConnectionSupport, responseManager, logger) rmiGlobalSupport.processMessage(serialization, connection, message, rmiConnectionSupport, responseManager, logger)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(connection, e) listenerManager.notifyError(connection, RMIException("Error while processing RMI message", e))
} }
} }
} }
@ -552,8 +603,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
try { try {
streamingManager.processDataMessage(message, this@EndPoint, connection) streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) { } catch (e: Exception) {
val newException = StreamingException("Error processing StreamingMessage", e) listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
listenerManager.notifyError(connection, newException)
} }
} }
@ -568,17 +618,16 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
hasListeners = hasListeners or connection.notifyOnMessage(message) hasListeners = hasListeners or connection.notifyOnMessage(message)
if (!hasListeners) { if (!hasListeners) {
logger.error("No message callbacks found for ${message::class.java.name}") listenerManager.notifyError(connection, MessageDispatchException("No message callbacks found for ${message::class.java.name}"))
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Error processing message ${message::class.java.name}", e) listenerManager.notifyError(connection, MessageDispatchException("Error processing message ${message::class.java.name}", e))
listenerManager.notifyError(connection, e)
} }
} }
} }
else -> { else -> {
logger.error("Unknown message received!!") listenerManager.notifyError(connection, MessageDispatchException("Unknown message received!!"))
} }
} }
} }
@ -601,7 +650,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
try { try {
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = readKryo.read(buffer, offset, length, connection) val message = readKryo.read(buffer, offset, length, connection)
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" } logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
processMessage(message, connection) processMessage(message, connection)
} catch (e: Exception) { } catch (e: Exception) {
@ -811,25 +860,35 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
return shutdown return shutdown
} }
/** /**
* Waits for this endpoint to be closed * Waits for this endpoint to be closed
*/ */
suspend fun waitForClose() { suspend fun waitForClose(): Boolean {
if (!shutdown) { return waitForClose(0L)
// we're not shutdown, so don't even bother waiting }
return
}
while (latch !== shutdownLatch) { /**
latch = shutdownLatch * Waits for this endpoint to be closed.
// if we are restarting the network state, we want to continue to wait for a proper close event. Because we RESET the latch, *
// we must continue to check * @return true if the wait completed before the timeout
*/
suspend fun waitForClose(timeoutMS: Long = 0L): Boolean {
logger.error { "WAITING FOR CLOSE $timeoutMS endpoint" }
// if we are restarting the network state, we want to continue to wait for a proper close event.
// when shutting down, it can take up to 5 seconds to fully register as "shutdown"
logger.error { "waiting for close event to finish" } return if (timeoutMS > 0) {
latch.await() pollerClosedLatch.await(timeoutMS, TimeUnit.MILLISECONDS) && shutdownLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
} else {
pollerClosedLatch.await()
logger.error { "waiting for shutdown" }
shutdownLatch.await()
true
} }
} }
/** /**
* Shall we preserve state when we shutdown, or do we remove all onConnect/Disconnect/etc events from memory. * Shall we preserve state when we shutdown, or do we remove all onConnect/Disconnect/etc events from memory.
* *
@ -837,98 +896,97 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* 1) We should reset 100% of the state+events, so that every time we connect, everything is redone * 1) We should reset 100% of the state+events, so that every time we connect, everything is redone
* 2) We preserve the state+event, BECAUSE adding the onConnect/Disconnect/message event states might be VERY expensive. * 2) We preserve the state+event, BECAUSE adding the onConnect/Disconnect/message event states might be VERY expensive.
* *
* If we call "close" multiple times, we want to run the requested logic + onCloseFunction EACH TIME IT'S CALLED! * NOTE: This method does NOT block, as the connection state is asynchronous. Use "waitForClose()" to wait for this to finish
*
* @param clientConnectionDC this is only true when a connection is closed in the client.
*/ */
suspend fun shutdown(shutdownEndpoint: Boolean = false, onCloseFunction: () -> Unit) { internal suspend fun closeSuspending(clientConnectionDC: Boolean = false) {
// we must set the shutdown state immediately // 1) endpoints can call close()
shutdown = true // 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually)
val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true)
if (shutdownPreviouslyStarted && clientConnectionDC) {
// this is only called when the client network event poller shuts down
// if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely)
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { // Clears out all registered events
logger.info { "Shutting down..." } listenerManager.close()
// if we were ALREADY shutdown, just run the appropriate logic again (but not ALL the logic) // Remove from memory the data from the back-end storage
storage.close()
aeronDriver.close()
// don't do anything more, since we've already shutdown!
return
}
if (!shutdownEndpoint) { logger.error { "Requesting endpoint shutdown for ${type.simpleName} shutdownPreviouslyStarted=$shutdownPreviouslyStarted" }
closeAction()
} else {
closeAction {
// Connections MUST be closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method)
responseManager.close()
EventDispatcher.CLOSE.launch {
logger.debug { "Shutting down endpoint..." }
// always do this. It is OK to run this multiple times
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
logger.error("CLOSING CONNECTIONS FROM CLOSE EVENT: ${connections.size()}")
connections.forEach {
it.closeImmediately()
}
// don't do these things if we are "closed" from a client connection disconnect
if (!clientConnectionDC && !shutdownPreviouslyStarted) {
// THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER!
shutdownEventPoller = true
// if we close the poller AND listener manager too quickly, events will not get published
pollerClosedLatch.await()
}
// this will ONLY close the event dispatcher if ALL endpoints have closed it.
// when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed.
networkEventPoller.close(logger, this)
// Connections MUST be closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method)
responseManager.close()
// don't do these things if we are "closed" from a client connection disconnect
if (!clientConnectionDC) {
// if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
// Clears out all registered events // Clears out all registered events
listenerManager.close() listenerManager.close()
// Remove from memory the data from the back-end storage // Remove from memory the data from the back-end storage
storage.close() storage.close()
// only on the server, this shuts down the poll/event dispatchers aeronDriver.close()
// This cannot be called from the event dispatcher!
close0()
if (this is Server<*>) { // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on
try { shutdown = true
// make sure that we have de-allocated all connection data shutdownLatch.countDown()
AeronDriver.checkForMemoryLeaks() shutdownInProgress.lazySet(false)
handshake.checkForMemoryLeaks() logger.info { "Done shutting down endpoint." }
} catch (e: AllocationException) {
logger.error(e) { "Error during server cleanup" }
}
}
} }
} else {
// when the client connection is closed, we don't close the driver/etc.
shutdown = true
shutdownLatch.countDown()
shutdownInProgress.lazySet(false)
logger.info { "Done shutting down endpoint." }
} }
onCloseFunction()
logger.info { "Done shutting down..." }
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
shutdownLatch.countDown()
canCloseLatch.countDown()
} }
// when the endpoint is closed, we must wait until after ALL CLOSE events are called!
logger.info { "Waiting for close listener to finish, then shutting down..." + EventDispatcher.getExecutingDispatchEvent() }
canCloseLatch.await()
} }
private suspend fun closeAction(extraActions: suspend () -> Unit = {}) { /**
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections * Reset the running state when there's an error starting up
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() */
val enableRemove = type == Client::class.java internal fun resetOnError() {
connections.forEach {
logger.info { "[${it}] Closing connection" }
it.close(enableRemove)
}
logger.error { "CLOSE ACTION 2" }
// must run after connections have been closed, but before anything else happens
extraActions()
logger.error { "CLOSE ACTION 3" }
close0()
logger.error { "CLOSE ACTION 4" }
// this will ONLY close the event dispatcher if ALL endpoints have closed it.
// when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed.
networkEventPoller.close(logger)
aeronDriver.close()
messageDispatch.cancel("${type.simpleName} shutting down")
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
shutdownLatch.countDown() shutdownLatch.countDown()
} pollerClosedLatch.countDown()
endpointIsRunning.lazySet(false)
shutdown = false
internal open suspend fun close0() {} shutdownEventPoller = false
override fun toString(): String {
return "EndPoint [${type.simpleName}] $uuid"
} }
override fun hashCode(): Int { override fun hashCode(): Int {