Cleaned up comments and logging

This commit is contained in:
Robinson 2023-06-25 12:13:52 +02:00
parent 119870bdc8
commit 4b511b2615
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 18 additions and 17 deletions

View File

@ -206,8 +206,14 @@ open class Server<CONNECTION : Connection>(
fun bind() = runBlocking {
// NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines!
if (bindAlreadyCalled.getAndSet(true)) {
logger.error { "Unable to bind when the server is already running!" }
// the lifecycle of a server is the ENDPOINT (measured via the network event poller)
if (endpointIsRunning.value) {
listenerManager.notifyError(ServerException("Unable to start, the server is already running!"))
return@runBlocking
}
if (!waitForClose()) {
listenerManager.notifyError(ServerException("Unable to start the server!"))
return@runBlocking
}
@ -216,7 +222,8 @@ open class Server<CONNECTION : Connection>(
verifyState()
initializeLatch()
} catch (e: Exception) {
logger.error(e) { "Unable to start the network driver" }
resetOnError()
listenerManager.notifyError(ServerException("Unable to start the server!", e))
return@runBlocking
}

View File

@ -239,14 +239,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
val configuration = config.copy()
if (AeronDriver.isLoaded(configuration, logger)) {
val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}")
logger.error("Error initializing server", e)
listenerManager.notifyError(e)
throw e
}
if (AeronDriver.isRunning(configuration, logger)) {
val e = ServerException("Only one server at a time can share a single aeron driver! Make the driver unique or change it's directory: ${configuration.aeronDirectory}")
logger.error("Error initializing server", e)
listenerManager.notifyError(e)
throw e
}
}
@ -257,7 +257,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
@Suppress("LeakingThis")
aeronDriver = AeronDriver(this)
} catch (e: Exception) {
logger.error("Error initializing server", e)
listenerManager.notifyError(Exception("Error initializing endpoint", e))
throw e
}
@ -329,8 +329,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
try {
aeronDriver = AeronDriver(this)
} catch (e: Exception) {
logger.error("Error initializing aeron driver", e)
throw e
throw newException("Error initializing aeron driver", e)
}
}
@ -593,6 +592,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped chunks of data.
is StreamingControl -> {
// TODO: this can be the streaming kryo???
streamingManager.processControlMessage(message, readKryo,this@EndPoint, connection)
}
is StreamingData -> {
@ -654,9 +654,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
processMessage(message, connection)
} catch (e: Exception) {
// The handshake sessionId IS NOT globally unique
logger.error("[${header.sessionId()}] Error de-serializing message", e)
listenerManager.notifyError(connection, e)
listenerManager.notifyError(connection, newException("Error de-serializing message", e))
}
}
@ -705,7 +703,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/
if (result == Publication.NOT_CONNECTED) {
if (abortEarly) {
logger.error { "[${publication.sessionId()}] Error sending message. (Connection in non-connected state, aborted attempt! ${errorCodeName(result)})" }
listenerManager.notifyError(newException("[${publication.sessionId()}] Error sending message. (Connection in non-connected state, aborted attempt! ${errorCodeName(result)})"))
return false
}
@ -874,7 +872,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* @return true if the wait completed before the timeout
*/
suspend fun waitForClose(timeoutMS: Long = 0L): Boolean {
logger.error { "WAITING FOR CLOSE $timeoutMS endpoint" }
// if we are restarting the network state, we want to continue to wait for a proper close event.
// when shutting down, it can take up to 5 seconds to fully register as "shutdown"
@ -882,7 +879,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
pollerClosedLatch.await(timeoutMS, TimeUnit.MILLISECONDS) && shutdownLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
} else {
pollerClosedLatch.await()
logger.error { "waiting for shutdown" }
shutdownLatch.await()
true
}
@ -920,15 +916,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
return
}
logger.error { "Requesting endpoint shutdown for ${type.simpleName} shutdownPreviouslyStarted=$shutdownPreviouslyStarted" }
EventDispatcher.CLOSE.launch {
logger.debug { "Shutting down endpoint..." }
// always do this. It is OK to run this multiple times
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
logger.error("CLOSING CONNECTIONS FROM CLOSE EVENT: ${connections.size()}")
logger.trace { "Closing ${connections.size()} via the close event" }
connections.forEach {
it.closeImmediately()
}