Cleanup client reconnect logic
This commit is contained in:
parent
b2f2077550
commit
e84be7f96a
@ -453,13 +453,25 @@ open class Client<CONNECTION : Connection>(
|
|||||||
throw exception
|
throw exception
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (slowDownForException) {
|
||||||
|
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
|
||||||
|
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
|
||||||
|
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
|
||||||
|
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
|
||||||
|
|
||||||
|
// ALSO, we want to make sure we DO NOT approach the linger timeout!
|
||||||
|
aeronDriver.delayLingerTimeout(2)
|
||||||
|
}
|
||||||
|
|
||||||
// we have to pre-set the type (which will ultimately get set to the correct type on success)
|
// we have to pre-set the type (which will ultimately get set to the correct type on success)
|
||||||
var type = ""
|
var type = ""
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// always start the aeron driver inside the restart loop. If we've already started the driver (on the first "start"),
|
// always start the aeron driver inside the restart loop.
|
||||||
// then this does nothing
|
// If we've already started the driver (on the first "start"), then this does nothing (slowDownForException also make this not "double check")
|
||||||
startDriver()
|
if (slowDownForException) {
|
||||||
|
startDriver()
|
||||||
|
}
|
||||||
|
|
||||||
// the handshake connection is closed when the handshake has an error, or it is finished
|
// the handshake connection is closed when the handshake has an error, or it is finished
|
||||||
val handshakeConnection = if (autoChangeToIpc) {
|
val handshakeConnection = if (autoChangeToIpc) {
|
||||||
@ -533,6 +545,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
|
|
||||||
connect0(handshake, handshakeConnection, handshakeTimeoutSec)
|
connect0(handshake, handshakeConnection, handshakeTimeoutSec)
|
||||||
success = true
|
success = true
|
||||||
|
slowDownForException = false
|
||||||
|
|
||||||
// once we're done with the connection process, stop trying
|
// once we're done with the connection process, stop trying
|
||||||
break
|
break
|
||||||
@ -548,23 +561,11 @@ open class Client<CONNECTION : Connection>(
|
|||||||
// maybe the aeron driver isn't running? (or isn't running correctly?)
|
// maybe the aeron driver isn't running? (or isn't running correctly?)
|
||||||
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
||||||
|
|
||||||
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
|
slowDownForException = true
|
||||||
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
|
|
||||||
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
|
|
||||||
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
|
|
||||||
|
|
||||||
// ALSO, we want to make sure we DO NOT approach the linger timeout!
|
|
||||||
sleep(aeronDriver.driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds(aeronDriver.getLingerNs()*2)))
|
|
||||||
} catch (e: ClientRejectedException) {
|
} catch (e: ClientRejectedException) {
|
||||||
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
||||||
|
|
||||||
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
|
slowDownForException = true
|
||||||
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
|
|
||||||
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
|
|
||||||
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
|
|
||||||
|
|
||||||
// ALSO, we want to make sure we DO NOT approach the linger timeout!
|
|
||||||
sleep(aeronDriver.driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds(aeronDriver.getLingerNs() * 2)))
|
|
||||||
|
|
||||||
if (e.cause is ServerException) {
|
if (e.cause is ServerException) {
|
||||||
val cause = e.cause!!
|
val cause = e.cause!!
|
||||||
@ -580,14 +581,6 @@ open class Client<CONNECTION : Connection>(
|
|||||||
|
|
||||||
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
|
||||||
|
|
||||||
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
|
|
||||||
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
|
|
||||||
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
|
|
||||||
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
|
|
||||||
|
|
||||||
// ALSO, we want to make sure we DO NOT approach the linger timeout!
|
|
||||||
sleep(aeronDriver.driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds(aeronDriver.getLingerNs() * 2)))
|
|
||||||
|
|
||||||
listenerManager.notifyError(e)
|
listenerManager.notifyError(e)
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user