Now properly waits for event dispatcher to shutdown in unit tests

This commit is contained in:
Robinson 2023-12-04 13:36:47 +01:00
parent 6bbf62f886
commit 6f50618040
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 43 additions and 5 deletions

View File

@ -1079,6 +1079,18 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
return networkEventPoller.isDispatch() return networkEventPoller.isDispatch()
} }
/**
* Shuts-down each event dispatcher executor, and waits for it to gracefully shutdown.
*
* Once shutdown, it cannot be restarted and the application MUST recreate the endpoint
*
* @param timeout how long to wait, must be > 0
* @param timeoutUnit what the unit count is
*/
fun shutdownEventDispatcher(timeout: Long = 15, timeoutUnit: TimeUnit = TimeUnit.SECONDS) {
logger.info("Waiting for Event Dispatcher to shutdown...")
eventDispatch.shutdownAndWait(timeout, timeoutUnit)
}
/** /**
* Reset the running state when there's an error starting up * Reset the running state when there's an error starting up

View File

@ -42,6 +42,10 @@ internal class EventDispatcher(val type: String) {
fun isDispatch(): Boolean { fun isDispatch(): Boolean {
return dispatcher.isDispatch(type) return dispatcher.isDispatch(type)
} }
fun shutdownAndWait(timeout: Long, timeoutUnit: TimeUnit) {
dispatcher.shutdownAndWait(type, timeout, timeoutUnit)
}
} }
companion object { companion object {
@ -75,8 +79,6 @@ internal class EventDispatcher(val type: String) {
) )
}.toTypedArray() }.toTypedArray()
val HANDSHAKE: ED val HANDSHAKE: ED
val CONNECT: ED val CONNECT: ED
val ERROR: ED val ERROR: ED
@ -97,6 +99,21 @@ internal class EventDispatcher(val type: String) {
} }
/**
* Shuts-down each event dispatcher executor, and waits for it to gracefully shutdown. Once shutdown, it cannot be restarted.
*
* @param timeout how long to wait
* @param timeoutUnit what the unit count is
*/
fun shutdownAndWait(timeout: Long, timeoutUnit: TimeUnit) {
require(timeout > 0) { logger.error("The EventDispatcher shutdown timeout must be > 0!") }
HANDSHAKE.shutdownAndWait(timeout, timeoutUnit)
CONNECT.shutdownAndWait(timeout, timeoutUnit)
ERROR.shutdownAndWait(timeout, timeoutUnit)
CLOSE.shutdownAndWait(timeout, timeoutUnit)
}
/** /**
* Checks if the current execution thread is running inside one of the event dispatchers. * Checks if the current execution thread is running inside one of the event dispatchers.
*/ */
@ -122,9 +139,17 @@ internal class EventDispatcher(val type: String) {
} }
/** /**
* Each event type runs inside its own coroutine dispatcher. * shuts-down the current execution thread and waits for it complete.
*/
private fun shutdownAndWait(type: EDType, timeout: Long, timeoutUnit: TimeUnit) {
executors[type.ordinal].shutdown()
executors[type.ordinal].awaitTermination(timeout, timeoutUnit)
}
/**
* Each event type runs inside its own thread executor.
* *
* We want EACH event type to run in its own dispatcher... on its OWN thread, in order to prevent deadlocks * We want EACH event type to run in its own executor... on its OWN thread, in order to prevent deadlocks
* This is because there are blocking dependencies: DISCONNECT -> CONNECT. * This is because there are blocking dependencies: DISCONNECT -> CONNECT.
* *
* If an event is RE-ENTRANT, then it will immediately execute! * If an event is RE-ENTRANT, then it will immediately execute!

View File

@ -299,7 +299,6 @@ abstract class BaseTest {
endPoint.stopDriver() endPoint.stopDriver()
} }
// run actions before we actually shutdown, but after we wait // run actions before we actually shutdown, but after we wait
if (!successClients || !successServers) { if (!successClients || !successServers) {
Assert.fail("Shutdown latch not triggered ($successClients|$successServers)!") Assert.fail("Shutdown latch not triggered ($successClients|$successServers)!")
@ -308,6 +307,7 @@ abstract class BaseTest {
// we must always make sure that aeron is shut-down before starting again. // we must always make sure that aeron is shut-down before starting again.
clients.forEach { endPoint -> clients.forEach { endPoint ->
endPoint.ensureStopped() endPoint.ensureStopped()
endPoint.shutdownEventDispatcher() // once shutdown, it cannot be restarted!
if (!Client.ensureStopped(endPoint.config.copy())) { if (!Client.ensureStopped(endPoint.config.copy())) {
throw IllegalStateException("Unable to continue, AERON client was unable to stop.") throw IllegalStateException("Unable to continue, AERON client was unable to stop.")
@ -316,6 +316,7 @@ abstract class BaseTest {
servers.forEach { endPoint -> servers.forEach { endPoint ->
endPoint.ensureStopped() endPoint.ensureStopped()
endPoint.shutdownEventDispatcher() // once shutdown, it cannot be restarted!
if (!Client.ensureStopped(endPoint.config.copy())) { if (!Client.ensureStopped(endPoint.config.copy())) {
throw IllegalStateException("Unable to continue, AERON server was unable to stop.") throw IllegalStateException("Unable to continue, AERON server was unable to stop.")