Pub/Sub are now closed when exceptions are thrown

This commit is contained in:
Robinson 2022-12-17 23:21:44 +01:00
parent d408493907
commit e74f4a4e10
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 13 additions and 3 deletions

View File

@ -195,13 +195,13 @@ dependencies {
// https://github.com/MicroUtils/kotlin-logging // https://github.com/MicroUtils/kotlin-logging
api("io.github.microutils:kotlin-logging:3.0.4") api("io.github.microutils:kotlin-logging:3.0.4")
api("org.slf4j:slf4j-api:2.0.4") api("org.slf4j:slf4j-api:2.0.5")
testImplementation("junit:junit:4.13.2") testImplementation("junit:junit:4.13.2")
testImplementation("ch.qos.logback:logback-classic:1.4.4") testImplementation("ch.qos.logback:logback-classic:1.4.5")
} }
publishToSonatype { publishToSonatype {

View File

@ -499,7 +499,7 @@ open class Client<CONNECTION : Connection>(
isReliable = reliable isReliable = reliable
) )
type = "${udpConnection.type} '$remoteAddressString:${config.port}'" type = "${udpConnection.type} '$remoteAddressPrettyString:${config.port}'"
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.build(aeronDriver, logger) udpConnection.build(aeronDriver, logger)

View File

@ -160,6 +160,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
} }
// called from the connect thread // called from the connect thread
// when exceptions are thrown, the handshake pub/sub will be closed
fun hello(handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) : ClientConnectionInfo { fun hello(handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) : ClientConnectionInfo {
failedException = null failedException = null
connectKey = getSafeConnectKey() connectKey = getSafeConnectKey()
@ -206,11 +207,17 @@ internal class ClientHandshake<CONNECTION: Connection>(
val failedEx = failedException val failedEx = failedException
if (failedEx != null) { if (failedEx != null) {
subscription.close()
publication.close()
ListenerManager.cleanStackTraceInternal(failedEx) ListenerManager.cleanStackTraceInternal(failedEx)
throw failedEx throw failedEx
} }
if (connectionHelloInfo == null) { if (connectionHelloInfo == null) {
subscription.close()
publication.close()
val exception = ClientTimedOutException("[$aeronLogInfo] Waiting for registration response from server") val exception = ClientTimedOutException("[$aeronLogInfo] Waiting for registration response from server")
ListenerManager.cleanStackTraceInternal(exception) ListenerManager.cleanStackTraceInternal(exception)
throw exception throw exception
@ -220,6 +227,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
} }
// called from the connect thread // called from the connect thread
// when exceptions are thrown, the handshake pub/sub will be closed
fun done(handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) { fun done(handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) {
val registrationMessage = HandshakeMessage.doneFromClient(connectKey, val registrationMessage = HandshakeMessage.doneFromClient(connectKey,
handshakeConnection.port+1, handshakeConnection.port+1,
@ -272,6 +280,8 @@ internal class ClientHandshake<CONNECTION: Connection>(
val failedEx = failedException val failedEx = failedException
if (failedEx != null) { if (failedEx != null) {
handshakeConnection.subscription.close()
handshakeConnection.publication.close()
throw failedEx throw failedEx
} }