pollerClosedLatch is now only created once we've fully started (prevent blocking forever when shutting down)

This commit is contained in:
Robinson 2023-09-08 13:21:12 +02:00
parent dafcc97eac
commit c69512eda4
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 34 additions and 7 deletions

View File

@ -195,6 +195,9 @@ open class Client<CONNECTION : Connection>(
@Volatile
private var slowDownForException = false
@Volatile
private var stopConnectOnShutdown = false
// is valid when there is a connection to the server, otherwise it is null
@Volatile
private var connection0: CONNECTION? = null
@ -516,6 +519,7 @@ open class Client<CONNECTION : Connection>(
// this also makes sure that the dispatchers are still active.
// Calling `client.close()` will shutdown the dispatchers (and a new client instance must be created)
try {
stopConnectOnShutdown = false
startDriver()
initializeState()
} catch (e: Exception) {
@ -563,7 +567,7 @@ open class Client<CONNECTION : Connection>(
val startTime = System.nanoTime()
var success = false
while (connectionTimoutInNs == 0L || System.nanoTime() - startTime < connectionTimoutInNs) {
while (!stopConnectOnShutdown && (connectionTimoutInNs == 0L || System.nanoTime() - startTime < connectionTimoutInNs)) {
if (isShutdown()) {
resetOnError()
@ -626,6 +630,11 @@ open class Client<CONNECTION : Connection>(
// once we're done with the connection process, stop trying
break
} catch (e: ClientRetryException) {
if (stopConnectOnShutdown) {
aeronDriver.closeIfSingle()
break
}
val inSeconds = TimeUnit.NANOSECONDS.toSeconds(handshakeTimeoutNs)
val message = if (isIPC) {
"Unable to connect to IPC in $inSeconds seconds, retrying..."
@ -640,11 +649,15 @@ open class Client<CONNECTION : Connection>(
}
// maybe the aeron driver isn't running? (or isn't running correctly?)
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, stop it
slowDownForException = true
} catch (e: ClientRejectedException) {
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, stop it
if (stopConnectOnShutdown) {
break
}
slowDownForException = true
@ -661,6 +674,11 @@ open class Client<CONNECTION : Connection>(
}
} catch (e: Exception) {
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
if (stopConnectOnShutdown) {
break
}
listenerManager.notifyError(ClientException("[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting.", e))
resetOnError()
throw e
@ -670,6 +688,12 @@ open class Client<CONNECTION : Connection>(
if (!success) {
endpointIsRunning.lazySet(false)
if (stopConnectOnShutdown) {
val exception = ClientException("Client closed during connection attempt. Aborting connection attempts.")
listenerManager.notifyError(exception)
throw exception
}
if (System.nanoTime() - startTime < connectionTimoutInNs) {
val type = if (connection0 == null) {
@ -822,8 +846,9 @@ open class Client<CONNECTION : Connection>(
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
listenerManager.notifyInit(newConnection)
// have to make a new thread to listen for incoming data!
// SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them
// if we shutdown/close before the poller starts, we don't want to block forever
pollerClosedLatch = CountDownLatch(1)
networkEventPoller.submit(
action = object : EventActionOperator {
override fun invoke(): Int {
@ -987,6 +1012,7 @@ open class Client<CONNECTION : Connection>(
* @param closeEverything if true, all parts of the client will be closed (listeners, driver, event polling, etc)
*/
fun close(closeEverything: Boolean = true) {
stopConnectOnShutdown = true
close(
closeEverything = closeEverything,
notifyDisconnect = true,

View File

@ -277,6 +277,8 @@ open class Server<CONNECTION : Connection>(
logger.info { ipcPoller.info }
logger.info { ipPoller.info }
// if we shutdown/close before the poller starts, we don't want to block forever
pollerClosedLatch = CountDownLatch(1)
networkEventPoller.submit(
action = object : EventActionOperator {
override fun invoke(): Int {

View File

@ -52,6 +52,7 @@ import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.concurrent.IdleStrategy
import org.agrona.concurrent.SigInt
import java.util.concurrent.*
@ -318,7 +319,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
internal fun initializeState() {
// on the first run, we depend on these to be 0
shutdownLatch = CountDownLatch(1)
pollerClosedLatch = CountDownLatch(1)
closeLatch = CountDownLatch(1)
endpointIsRunning.lazySet(true)
@ -928,7 +928,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
// this closes the endpoint specific instance running in the poller
// THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER!