Fixed shutdown performance/bug

This commit is contained in:
Robinson 2021-07-26 20:16:10 +02:00
parent 8111fc3d3d
commit 7d2263b47b

View File

@ -63,7 +63,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
abstract class BaseTest { abstract class BaseTest {
private val lock = Any() @Volatile
private var latch = CountDownLatch(1) private var latch = CountDownLatch(1)
@Volatile @Volatile
@ -100,6 +100,8 @@ abstract class BaseTest {
} }
fun setLogLevel(level: Level) { fun setLogLevel(level: Level) {
println("Log level: $level")
// assume SLF4J is bound to logback in the current environment // assume SLF4J is bound to logback in the current environment
val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
rootLogger.level = level rootLogger.level = level
@ -127,13 +129,13 @@ abstract class BaseTest {
consoleAppender.encoder = encoder consoleAppender.encoder = encoder
consoleAppender.start() consoleAppender.start()
rootLogger.addAppender(consoleAppender) rootLogger.addAppender(consoleAppender)
//
// context.getLogger(Server::class.simpleName).trace("TESTING") // context.getLogger(Server::class.simpleName).trace("TESTING")
// context.getLogger(Client::class.simpleName).trace("TESTING") // context.getLogger(Client::class.simpleName).trace("TESTING")
} }
// wait minimum of 2 minutes before we automatically fail the unit test. // wait minimum of 2 minutes before we automatically fail the unit test.
var AUTO_FAIL_TIMEOUT: Long = 120 var AUTO_FAIL_TIMEOUT: Long = 120L
init { init {
if (OS.javaVersion >= 9) { if (OS.javaVersion >= 9) {
@ -177,19 +179,19 @@ abstract class BaseTest {
// we must always make sure that aeron is shut-down before starting again. // we must always make sure that aeron is shut-down before starting again.
while (Server.isRunning(serverConfig())) { while (Server.isRunning(serverConfig())) {
println("Aeron was still running. Waiting for it to stop...") println("Aeron was still running. Waiting for it to stop...")
Thread.sleep(2000) sleep(2000)
} }
} }
fun addEndPoint(endPointConnection: EndPoint<*>) { fun addEndPoint(endPointConnection: EndPoint<*>) {
endPointConnections.add(endPointConnection) endPointConnections.add(endPointConnection)
synchronized(lock) { latch = CountDownLatch(endPointConnections.size + 1) } latch = CountDownLatch(endPointConnections.size + 1)
} }
/** /**
* Immediately stop the endpoints * Immediately stop the endpoints
*/ */
fun stopEndPoints(stopAfterMillis: Long = 0) { fun stopEndPoints(stopAfterMillis: Long = 0L) {
if (isStopping) { if (isStopping) {
return return
} }
@ -197,23 +199,31 @@ abstract class BaseTest {
// not the best, but this works for our purposes. This is a TAD hacky, because we ALSO have to make sure that we // not the best, but this works for our purposes. This is a TAD hacky, because we ALSO have to make sure that we
// ARE NOT in the same thread group as netty! // ARE NOT in the same thread group as netty!
sleep(stopAfterMillis) if (stopAfterMillis > 0L) {
sleep(stopAfterMillis)
}
synchronized(lock) {} // we start with "1", so make sure adjust if we want an accurate count
println("Shutting down ${endPointConnections.size} (${latch.count - 1}) endpoints...")
val remainingConnections = mutableListOf<EndPoint<*>>()
// shutdown clients first // shutdown clients first
for (endPoint in endPointConnections) { endPointConnections.forEach { endPoint ->
if (endPoint is Client) { if (endPoint is Client) {
endPoint.close() endPoint.close()
latch.countDown() latch.countDown()
println("Done with ${endPoint.type.simpleName}")
} else {
remainingConnections.add(endPoint)
} }
} }
// shutdown servers last
for (endPoint in endPointConnections) { // shutdown everything else (should only be servers) last
if (endPoint is Server) { println("Shutting down ${remainingConnections.size} (${latch.count - 1}) endpoints...")
endPoint.close() remainingConnections.forEach {
latch.countDown() it.close()
} latch.countDown()
} }
// we start with "1", so make sure to end it // we start with "1", so make sure to end it
@ -228,7 +238,6 @@ abstract class BaseTest {
* @param stopAfterSeconds how many seconds to wait, the default is 2 minutes. * @param stopAfterSeconds how many seconds to wait, the default is 2 minutes.
*/ */
fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT) { fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT) {
synchronized(lock) {}
try { try {
if (stopAfterSeconds == 0L) { if (stopAfterSeconds == 0L) {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS) latch.await(Long.MAX_VALUE, TimeUnit.SECONDS)