Fixed/cleaned up connection polling and restarts
parent
ba57447169
commit
0c4c442b3a
|
@ -130,6 +130,15 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
var connectionTimeoutSec: Int = 0
|
||||
private set
|
||||
|
||||
/**
|
||||
* - if the client is internally going to reconnect (because of a network error)
|
||||
* - we have specified that we will run the disconnect logic
|
||||
* - there is reconnect logic in the disconnect handler
|
||||
*
|
||||
* Then ultimately, we want to ignore the disconnect-handler reconnect (we do not want to have multiple reconnects happening concurrently)
|
||||
*/
|
||||
@Volatile
|
||||
private var autoReconnect = false
|
||||
|
||||
private val handshake = ClientHandshake(this, logger)
|
||||
|
||||
|
@ -165,6 +174,15 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
*/
|
||||
@Suppress("DuplicatedCode")
|
||||
fun reconnect() {
|
||||
if (autoReconnect) {
|
||||
// we must check if we should permit a MANUAL reconnect, because the auto-reconnect MIGHT ALSO re-connect!
|
||||
|
||||
// autoReconnect will be "reset" when the connection closes. If in a happy state, then a manual reconnect is permitted.
|
||||
logger.info("Ignoring reconnect, auto-reconnect is in progress")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
if (connectionTimeoutSec == 0) {
|
||||
logger.info("Reconnecting...")
|
||||
} else {
|
||||
|
@ -509,6 +527,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
val startTime = System.nanoTime()
|
||||
var success = false
|
||||
while (!stopConnectOnShutdown && (connectionTimoutInNs == 0L || System.nanoTime() - startTime < connectionTimoutInNs)) {
|
||||
|
||||
if (isShutdown()) {
|
||||
resetOnError()
|
||||
|
||||
|
@ -639,11 +658,12 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
if (stopConnectOnShutdown) {
|
||||
val exception = ClientException("Client closed during connection attempt. Aborting connection attempts.").cleanStackTrace(3)
|
||||
listenerManager.notifyError(exception)
|
||||
// if we are waiting for this connection to connect (on a different thread, for example), make sure to release it.
|
||||
closeLatch.countDown()
|
||||
throw exception
|
||||
}
|
||||
|
||||
if (System.nanoTime() - startTime < connectionTimoutInNs) {
|
||||
|
||||
val type = if (isIPC) {
|
||||
"IPC"
|
||||
} else {
|
||||
|
@ -703,7 +723,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
// NOTE: this can change depending on what the server specifies!
|
||||
// should we queue messages during a reconnect? This is important if the client/server connection is unstable
|
||||
if (connectionInfo.enableSession && sessionManager is SessionManagerNoOp) {
|
||||
sessionManager = SessionManagerFull(config, aeronDriver, connectionInfo.sessionTimeout)
|
||||
sessionManager = SessionManagerFull(config, listenerManager as ListenerManager<SessionConnection>, aeronDriver, connectionInfo.sessionTimeout)
|
||||
} else if (!connectionInfo.enableSession && sessionManager is SessionManagerFull) {
|
||||
sessionManager = SessionManagerNoOp()
|
||||
}
|
||||
|
@ -809,12 +829,14 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
logger.debug("[${handshakeConnection.details}] $connType (${newConnection.id}) done with handshake.")
|
||||
}
|
||||
|
||||
connection0 = newConnection
|
||||
newConnection.setImage()
|
||||
|
||||
|
||||
// in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session
|
||||
// when applicable - we ALSO want to restore RMI objects BEFORE the connection is fully setup!
|
||||
val newSession = sessionManager.onInit(newConnection)
|
||||
|
||||
newConnection.setImage()
|
||||
|
||||
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
|
||||
if (newSession) {
|
||||
listenerManager.notifyInit(newConnection)
|
||||
|
@ -822,29 +844,30 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
|
||||
// this enables the connection to start polling for messages
|
||||
addConnection(newConnection)
|
||||
connection0 = newConnection
|
||||
|
||||
|
||||
// if we shutdown/close before the poller starts, we don't want to block forever
|
||||
pollerClosedLatch = CountDownLatch(1)
|
||||
networkEventPoller.submit(
|
||||
action = object : EventActionOperator {
|
||||
override fun invoke(): Int {
|
||||
val connection = connection0
|
||||
|
||||
// if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed
|
||||
// we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip
|
||||
return if (!(shutdownEventPoller || newConnection.isClosed() || newConnection.isClosedWithTimeout())) {
|
||||
newConnection.poll()
|
||||
} else {
|
||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("[${connection}] connection expired (cleanup)")
|
||||
return if (connection != null) {
|
||||
if (!shutdownEventPoller && connection.canPoll()) {
|
||||
connection.poll()
|
||||
} else {
|
||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||
logger.error("[${connection}] connection expired (cleanup). shutdownEventPoller=$shutdownEventPoller isClosed()=${connection.isClosed()} isClosedWithTimeout=${connection.isClosedWithTimeout()}")
|
||||
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("[{}] connection expired (cleanup)", connection)
|
||||
}
|
||||
// remove ourselves from processing
|
||||
EventPoller.REMOVE
|
||||
}
|
||||
|
||||
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
|
||||
removeConnection(newConnection)
|
||||
|
||||
// we already removed the connection, we can call it again without side effects
|
||||
newConnection.close()
|
||||
|
||||
} else {
|
||||
// remove ourselves from processing
|
||||
EventPoller.REMOVE
|
||||
}
|
||||
|
@ -852,14 +875,20 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
},
|
||||
onClose = object : EventCloseOperator {
|
||||
override fun invoke() {
|
||||
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
||||
val uncleanDisconnect = !shutdownEventPoller && !newConnection.isClosed() && newConnection.isClosedWithTimeout()
|
||||
val autoReconnect = mustRestartDriverOnError || uncleanDisconnect
|
||||
val connection = connection0
|
||||
if (connection == null) {
|
||||
logger.error("Unable to continue, as the connection has been removed before event dispatch shutdown!")
|
||||
return
|
||||
}
|
||||
|
||||
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
||||
val dirtyDisconnectWithSession = (this@Client is SessionClient) && !shutdownEventPoller && connection.isDirtyClose()
|
||||
|
||||
autoReconnect = mustRestartDriverOnError || dirtyDisconnectWithSession
|
||||
|
||||
if (mustRestartDriverOnError) {
|
||||
logger.error("[{}] Critical driver error detected, reconnecting client", connection)
|
||||
} else if (dirtyDisconnect) {
|
||||
} else if (dirtyDisconnectWithSession) {
|
||||
logger.error("[{}] Dirty disconnect detected, reconnecting client", connection)
|
||||
}
|
||||
|
||||
|
@ -871,13 +900,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
// we only need to run shutdown methods if there was a network outage or D/C
|
||||
if (!shutdownInProgress.value) {
|
||||
// this is because we restart automatically on driver errors/weird timeouts
|
||||
val standardClose = !autoReconnect
|
||||
this@Client.close(
|
||||
closeEverything = false,
|
||||
sendDisconnectMessage = standardClose,
|
||||
notifyDisconnect = standardClose,
|
||||
releaseWaitingThreads = standardClose
|
||||
)
|
||||
this@Client.close(closeEverything = false, sendDisconnectMessage = true, releaseWaitingThreads = !autoReconnect)
|
||||
}
|
||||
|
||||
|
||||
|
@ -885,29 +908,49 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
endpointIsRunning.lazySet(false)
|
||||
pollerClosedLatch.countDown()
|
||||
|
||||
|
||||
connection0 = null
|
||||
|
||||
if (autoReconnect) {
|
||||
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
|
||||
// clients can reconnect automatically ONLY if there are driver errors, otherwise it's explicit!
|
||||
eventDispatch.CLOSE.launch {
|
||||
logger.error("MUST AUTORECONNECT STARTING ON CLOSE ***********************************")
|
||||
waitForEndpointShutdown()
|
||||
|
||||
// also wait for everyone else to shutdown!!
|
||||
aeronDriver.internal.endPointUsages.forEach {
|
||||
it.waitForEndpointShutdown()
|
||||
if (it !== this@Client) {
|
||||
it.waitForEndpointShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// if we restart/reconnect too fast, errors from the previous run will still be present!
|
||||
aeronDriver.delayLingerTimeout()
|
||||
|
||||
reconnect()
|
||||
if (connectionTimeoutSec == 0) {
|
||||
logger.info("Reconnecting...", Exception())
|
||||
} else {
|
||||
logger.info("Reconnecting... (timeout in $connectionTimeoutSec seconds)", Exception())
|
||||
}
|
||||
|
||||
connect(
|
||||
remoteAddress = address,
|
||||
remoteAddressString = addressString,
|
||||
remoteAddressPrettyString = addressPrettyString,
|
||||
port1 = port1,
|
||||
port2 = port2,
|
||||
connectionTimeoutSec = connectionTimeoutSec,
|
||||
reliable = reliable,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
logger.debug("[{}] Closed the Network Event Poller...", connection)
|
||||
}
|
||||
logger.debug("[{}] Closed the Network Event Poller task.", connection)
|
||||
}
|
||||
})
|
||||
|
||||
if (newSession) {
|
||||
listenerManager.notifyConnect(newConnection)
|
||||
} else {
|
||||
listenerManager.notifyConnect(newConnection)
|
||||
|
||||
if (!newSession) {
|
||||
(newConnection as SessionConnection).sendPendingMessages()
|
||||
}
|
||||
}
|
||||
|
@ -1007,12 +1050,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
*/
|
||||
fun close(closeEverything: Boolean = true) {
|
||||
stopConnectOnShutdown = true
|
||||
close(
|
||||
closeEverything = closeEverything,
|
||||
sendDisconnectMessage = true,
|
||||
notifyDisconnect = true,
|
||||
releaseWaitingThreads = true
|
||||
)
|
||||
close(closeEverything = closeEverything, sendDisconnectMessage = true, releaseWaitingThreads = true)
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
|
|
|
@ -233,7 +233,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
|||
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
|
||||
// so we can modify this as we go
|
||||
connections.forEach { connection ->
|
||||
if (!(connection.isClosed() || connection.isClosedWithTimeout()) ) {
|
||||
if (connection.canPoll()) {
|
||||
// Otherwise, poll the connection for messages
|
||||
pollCount += connection.poll()
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue