Cleaned up/tweaked endpoint.close()

This commit is contained in:
Robinson 2023-08-09 21:35:02 -06:00
parent 3852677feb
commit 836c8abce6
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 96 additions and 36 deletions

View File

@ -487,8 +487,10 @@ open class Client<CONNECTION : Connection>(
return
}
// the lifecycle of a client is the ENDPOINT (measured via the network event poller) and CONNECTION (measure from connection closed)
if (!waitForClose()) {
// if we are reconnecting, then we do not want to wait for the ENDPOINT to close first!
if (!waitForEndpointShutdown()) {
if (endpointIsRunning.value) {
listenerManager.notifyError(ServerException("Unable to start, the client is already running!"))
} else {
@ -952,7 +954,9 @@ open class Client<CONNECTION : Connection>(
*/
fun close(closeEverything: Boolean = true) {
runBlocking {
close(closeEverything = closeEverything, initiatedByClientClose = false, initiatedByShutdown = false)
close(
closeEverything = closeEverything, releaseWaitingThreads = true
)
}
}

View File

@ -225,7 +225,7 @@ open class Server<CONNECTION : Connection>(
return@runBlocking
}
if (!waitForClose()) {
if (!waitForEndpointShutdown()) {
listenerManager.notifyError(ServerException("Unable to start the server!"))
return@runBlocking
}
@ -408,7 +408,9 @@ open class Server<CONNECTION : Connection>(
*/
fun close(closeEverything: Boolean = true) {
runBlocking {
close(closeEverything = closeEverything, initiatedByClientClose = false, initiatedByShutdown = false)
close(
closeEverything = closeEverything, releaseWaitingThreads = true
)
}
}

View File

@ -280,13 +280,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* Closes the connection, and removes all connection specific listeners
*/
suspend fun close() {
close(true)
close(sendDisconnectMessage = true,
notifyDisconnect = true)
}
/**
* Closes the connection, and removes all connection specific listeners
*/
internal suspend fun close(sendDisconnectMessage: Boolean) {
internal suspend fun close(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean) {
// there are 2 ways to call close.
// MANUALLY
// When a connection is disconnected via a timeout/expire.
@ -295,14 +296,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// make sure that EVERYTHING before "close()" runs before we do
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
closeImmediately(sendDisconnectMessage)
closeImmediately(sendDisconnectMessage, notifyDisconnect)
}
}
// connection.close() -> this
// endpoint.close() -> connection.close() -> this
internal suspend fun closeImmediately(sendDisconnectMessage: Boolean) {
internal suspend fun closeImmediately(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean) {
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
if (!isClosed.compareAndSet(expect = false, update = true)) {
return
@ -316,7 +317,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// notify the remote endPoint that we are closing
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
if (sendDisconnectMessage && publication.isConnected) {
logger.trace { "Sending disconnect message to remote endpoint" }
logger.trace { "Sending disconnect message to ${endPoint.otherTypeName}" }
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
send(DisconnectMessage.INSTANCE, true)
@ -333,7 +334,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val connection = this
endPoint.isServer {
endPoint.ifServer {
// clean up the resources associated with this connection when it's closed
logger.debug { "[${connection}] freeing resources" }
sessionIdAllocator.free(info.sessionIdPub)

View File

@ -261,20 +261,41 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
hook = Thread {
runBlocking {
close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = true)
close(
closeEverything = true, releaseWaitingThreads = true
)
}
}
Runtime.getRuntime().addShutdownHook(hook)
}
internal fun isServer(function: Server<CONNECTION>.() -> Unit) {
internal val typeName: String
get() {
return if (type == Server::class.java) {
"server"
} else {
"client"
}
}
internal val otherTypeName: String
get() {
return if (type == Server::class.java) {
"client"
} else {
"server"
}
}
internal fun ifServer(function: Server<CONNECTION>.() -> Unit) {
if (type == Server::class.java) {
function(this as Server<CONNECTION>)
}
}
internal fun isClient(function: Client<CONNECTION>.() -> Unit) {
internal fun ifClient(function: Client<CONNECTION>.() -> Unit) {
if (type == Client::class.java) {
function(this as Client<CONNECTION>)
}
@ -298,6 +319,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// on the first run, we depend on these to be 0
shutdownLatch = dorkbox.util.sync.CountDownLatch(1)
pollerClosedLatch = dorkbox.util.sync.CountDownLatch(1)
closeLatch = dorkbox.util.sync.CountDownLatch(1)
endpointIsRunning.lazySet(true)
shutdown = false
shutdownEventPoller = false
@ -567,8 +590,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
is DisconnectMessage -> {
// NOTE: This MUST be on a new co-routine (this is...)
runBlocking {
connection.close(false)
logger.debug { "Received disconnect message from $otherTypeName" }
connection.close(sendDisconnectMessage = false,
notifyDisconnect = true)
}
}
@ -759,6 +783,22 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
return shutdown
}
/**
* Waits for this endpoint to be internally shutdown, but not 100% fully closed (which only happens manually)
*
* @return true if the wait completed before the timeout
*/
internal suspend fun waitForEndpointShutdown(timeoutMS: Long = 0L): Boolean {
return if (timeoutMS > 0) {
pollerClosedLatch.await(timeoutMS, TimeUnit.MILLISECONDS) &&
shutdownLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
} else {
pollerClosedLatch.await()
shutdownLatch.await()
true
}
}
/**
* Waits for this endpoint to be closed
@ -768,7 +808,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
/**
* Waits for this endpoint to be closed.
* Waits for this endpoint to be fully closed. A disconnect from the network (or remote endpoint) will not signal this to continue.
*
* @return true if the wait completed before the timeout
*/
@ -776,13 +816,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// if we are restarting the network state, we want to continue to wait for a proper close event.
// when shutting down, it can take up to 5 seconds to fully register as "shutdown"
return if (timeoutMS > 0) {
pollerClosedLatch.await(timeoutMS, TimeUnit.MILLISECONDS) && shutdownLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
val success = if (timeoutMS > 0) {
closeLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
} else {
pollerClosedLatch.await()
shutdownLatch.await()
closeLatch.await()
true
}
return success
}
@ -799,10 +840,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/
internal suspend fun close(
closeEverything: Boolean,
initiatedByClientClose: Boolean,
initiatedByShutdown: Boolean)
releaseWaitingThreads: Boolean)
{
logger.debug { "Requesting close: closeEverything=$closeEverything, initiatedByClientClose=$initiatedByClientClose, initiatedByShutdown=$initiatedByShutdown" }
logger.debug { "Requesting close: closeEverything=$closeEverything, releaseWaitingThreads=$releaseWaitingThreads" }
// 1) endpoints can call close()
// 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually)
@ -824,10 +864,11 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
return
}
if (!shutdownPreviouslyStarted && !initiatedByShutdown) {
if (!shutdownPreviouslyStarted && Thread.currentThread() != hook) {
try {
Runtime.getRuntime().removeShutdownHook(hook)
} catch (ignored: Exception) {
} catch (ignored: RuntimeException) {
}
}
@ -837,23 +878,28 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// always do this. It is OK to run this multiple times
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
logger.trace { "Closing ${connections.size()} via the close event" }
connections.forEach {
it.closeImmediately(true)
it.closeImmediately(sendDisconnectMessage = true,
notifyDisconnect = true)
}
// don't do these things if we are "closed" from a client connection disconnect
if (closeEverything && !initiatedByClientClose) {
// THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER!
shutdownEventPoller = true
// if we close the poller AND listener manager too quickly, events will not get published
pollerClosedLatch.await()
}
// this closes the endpoint specific instance running in the poller
// THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER!
shutdownEventPoller = true
// if we close the poller AND listener manager too quickly, events will not get published
pollerClosedLatch.await()
// this will ONLY close the event dispatcher if ALL endpoints have closed it.
// when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed.
networkEventPoller.close(logger, this)
// Connections MUST be closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method)
@ -870,15 +916,22 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// Remove from memory the data from the back-end storage
storage.close()
aeronDriver.close()
}
aeronDriver.close()
shutdown = true
// the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on
shutdown = true
shutdownLatch.countDown()
shutdownInProgress.lazySet(false)
logger.info { "Done shutting down endpoint." }
if (releaseWaitingThreads) {
logger.trace { "Counting down the close latch..." }
closeLatch.countDown()
}
logger.info { "Done shutting down the endpoint." }
}
}
}