Now safely try to close a connection when it's not possible (just log, don't throw exception)

This commit is contained in:
Robinson 2023-09-26 19:53:27 +02:00
parent 653236a7e2
commit b55168a3eb
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
7 changed files with 158 additions and 38 deletions

View File

@ -673,6 +673,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
val connectionInfo = handshake.hello(
endPoint = this,
handshakeConnection = handshakeConnection,
handshakeTimeoutNs = handshakeTimeoutNs
)
@ -685,7 +686,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
}
if (validateRemoteAddress == PublicKeyValidationState.INVALID) {
handshakeConnection.close()
handshakeConnection.close(this)
val exception = ClientRejectedException("Connection to [$addressString] not allowed! Public key mismatch.")
listenerManager.notifyError(exception)
@ -714,7 +715,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
// only have ot do one
serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails)
} catch (e: Exception) {
handshakeConnection.close()
handshakeConnection.close(this)
// because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
@ -785,9 +786,11 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
// also closes the handshake (will also throw connect timeout exception)
try {
handshake.done(handshakeConnection, clientConnection,
handshakeTimeoutNs = handshakeTimeoutNs,
logInfo = handshakeConnection.details
handshake.done(
endPoint = this,
handshakeConnection, clientConnection,
handshakeTimeoutNs = handshakeTimeoutNs,
logInfo = handshakeConnection.details
)
} catch (e: Exception) {
listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] error during handshake", e))
@ -795,7 +798,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
}
// finished with the handshake, so always close these!
handshakeConnection.close()
handshakeConnection.close(this)
if (logger.isDebugEnabled) {
logger.debug("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] done with handshake.")

View File

@ -544,9 +544,16 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo
Thread.sleep(200L)
}
close(publication, logInfo)
var closeException: Exception? = null
try {
// we might not be able to close this connection.
close(publication, logInfo)
}
catch (e: Exception) {
closeException = e
}
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}"))
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Publication timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${publication.channel()} streamId=${publication.streamId()}", closeException))
exception.cleanAllStackTrace()
throw exception
}
@ -574,9 +581,16 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo
Thread.sleep(200L)
}
close(subscription, logInfo)
var closeException: Exception? = null
try {
// we might not be able to close this connection.
close(subscription, logInfo)
}
catch (e: Exception) {
closeException = e
}
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Subscription timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${subscription.channel()} streamId=${subscription.streamId()}"))
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Subscription timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${subscription.channel()} streamId=${subscription.streamId()}", closeException))
exception.cleanAllStackTrace()
throw exception
}

View File

@ -360,7 +360,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// on close, we want to make sure this file is DELETED!
endPoint.aeronDriver.close(subscription, toString0)
try {
// we might not be able to close this connection!!
endPoint.aeronDriver.close(subscription, toString0)
}
catch (e: Exception) {
endPoint.listenerManager.notifyError(e)
}
// notify the remote endPoint that we are closing
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
@ -378,7 +385,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// on close, we want to make sure this file is DELETED!
endPoint.aeronDriver.close(publication, toString0)
try {
// we might not be able to close this connection.
endPoint.aeronDriver.close(publication, toString0)
}
catch (e: Exception) {
endPoint.listenerManager.notifyError(e)
}
// NOTE: any waiting RMI messages that are in-flight will terminate when they time-out (and then do nothing)
// NOTE: notifyDisconnect() is called inside closeAction()!!

View File

@ -18,6 +18,7 @@ package dorkbox.network.handshake
import dorkbox.network.Client
import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.exceptions.*
@ -176,7 +177,11 @@ internal class ClientHandshake<CONNECTION: Connection>(
// called from the connect thread
// when exceptions are thrown, the handshake pub/sub will be closed
fun hello(handshakeConnection: ClientHandshakeDriver, handshakeTimeoutNs: Long) : ClientConnectionInfo {
fun hello(
endPoint: EndPoint<CONNECTION>,
handshakeConnection: ClientHandshakeDriver,
handshakeTimeoutNs: Long
) : ClientConnectionInfo {
val pubSub = handshakeConnection.pubSub
// is our pub still connected??
@ -198,7 +203,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
portSub = pubSub.portSub
))
} catch (e: Exception) {
handshakeConnection.close()
handshakeConnection.close(endPoint)
throw TransmitException("$handshakeConnection Handshake message error!", e)
}
@ -219,14 +224,14 @@ internal class ClientHandshake<CONNECTION: Connection>(
val failedEx = failedException
if (failedEx != null) {
handshakeConnection.close()
handshakeConnection.close(endPoint)
failedEx.cleanStackTraceInternal()
throw failedEx
}
if (connectionHelloInfo == null) {
handshakeConnection.close()
handshakeConnection.close(endPoint)
val exception = ClientTimedOutException("$handshakeConnection Waiting for registration response from server for more than ${Sys.getTimePrettyFull(handshakeTimeoutNs)}")
throw exception
@ -238,6 +243,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
// called from the connect thread
// when exceptions are thrown, the handshake pub/sub will be closed
fun done(
endPoint: EndPoint<CONNECTION>,
handshakeConnection: ClientHandshakeDriver,
clientConnection: ClientConnectionDriver,
handshakeTimeoutNs: Long,
@ -260,7 +266,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
streamIdSub = handshakePubSub.streamIdSub
))
} catch (e: Exception) {
handshakeConnection.close()
handshakeConnection.close(endPoint)
throw TransmitException("$handshakeConnection Handshake message error!", e)
}
@ -291,14 +297,14 @@ internal class ClientHandshake<CONNECTION: Connection>(
val failedEx = failedException
if (failedEx != null) {
handshakeConnection.close()
handshakeConnection.close(endPoint)
throw failedEx
}
if (!connectionDone) {
// since this failed, close everything
handshakeConnection.close()
handshakeConnection.close(endPoint)
val exception = ClientTimedOutException("Timed out waiting for registration response from server: ${Sys.getTimePrettyFull(handshakeTimeoutNs)}")
throw exception

View File

@ -329,7 +329,7 @@ internal class ClientHandshakeDriver(
}
}
fun close() {
fun close(endpoint: EndPoint<*>) {
// only the subs are allocated on the client!
// sessionIdAllocator.free(pubSub.sessionIdPub)
// sessionIdAllocator.free(sessionIdSub)
@ -337,7 +337,19 @@ internal class ClientHandshakeDriver(
streamIdAllocator.free(pubSub.streamIdSub)
// on close, we want to make sure this file is DELETED!
aeronDriver.close(pubSub.sub, logInfo)
aeronDriver.close(pubSub.pub, logInfo)
// we might not be able to close these connections.
try {
aeronDriver.close(pubSub.sub, logInfo)
}
catch (e: Exception) {
endpoint.listenerManager.notifyError(e)
}
try {
aeronDriver.close(pubSub.pub, logInfo)
}
catch (e: Exception) {
endpoint.listenerManager.notifyError(e)
}
}
}

View File

@ -18,6 +18,7 @@ package dorkbox.network.handshake
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.IpInfo
import io.aeron.ChannelUriStringBuilder
import io.aeron.CommonContext
@ -62,8 +63,14 @@ internal class ServerHandshakeDriver(
}
}
fun close() {
aeronDriver.close(subscription, logInfo)
fun close(endPoint: EndPoint<*>) {
try {
// we might not be able to close this connection.
aeronDriver.close(subscription, logInfo)
}
catch (e: Exception) {
endPoint.listenerManager.notifyError(e)
}
}
override fun toString(): String {

View File

@ -73,7 +73,14 @@ internal object ServerHandshakePollers {
}
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<Long, Publication> { connectKey, publication ->
driver.close(publication, "Server IPC Handshake ($connectKey)")
try {
// we might not be able to close this connection.
driver.close(publication, "Server IPC Handshake ($connectKey)")
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
.build<Long, Publication>()
@ -167,14 +174,27 @@ internal object ServerHandshakePollers {
if (success) {
publications[connectKey] = publication
} else {
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.deleteLogFile(image)
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
} else {
@ -207,7 +227,13 @@ internal object ServerHandshakePollers {
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.deleteLogFile(image)
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
}
}
@ -215,7 +241,13 @@ internal object ServerHandshakePollers {
fun close() {
publications.forEach { (connectKey, publication) ->
AeronDriver.sessionIdAllocator.free(publication.sessionId())
driver.close(publication, "Server Handshake ($connectKey)")
try {
// we might not be able to close this connection.
driver.close(publication, "Server Handshake ($connectKey)")
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
publications.clear()
}
@ -250,7 +282,13 @@ internal object ServerHandshakePollers {
}
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<Long, Publication> { connectKey, publication ->
driver.close(publication, "Server UDP Handshake ($connectKey)")
try {
// we might not be able to close this connection.
driver.close(publication, "Server UDP Handshake ($connectKey)")
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
.build<Long, Publication>()
@ -398,14 +436,27 @@ internal object ServerHandshakePollers {
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.deleteLogFile(image)
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
} catch (e: Exception) {
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
driver.deleteLogFile(image)
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
driver.close(publication, logInfo)
}
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
} else {
@ -435,7 +486,13 @@ internal object ServerHandshakePollers {
server.listenerManager.notifyError(ServerHandshakeException("[$logInfo] Error processing IPC handshake", e))
}
driver.close(publication, logInfo)
try {
// we might not be able to close this connection.
driver.close(publication, logInfo)
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
// we should immediately remove the logbuffer for this! Aeron will **EVENTUALLY** remove the logbuffer, but if errors
// and connections occur too quickly (within the cleanup/linger period), we can run out of memory!
@ -447,7 +504,14 @@ internal object ServerHandshakePollers {
fun close() {
publications.forEach { (connectKey, publication) ->
AeronDriver.sessionIdAllocator.free(publication.sessionId())
driver.close(publication, "Server Handshake ($connectKey)")
try {
// we might not be able to close this connection.
driver.close(publication, "Server Handshake ($connectKey)")
}
catch (e: Exception) {
server.listenerManager.notifyError(e)
}
}
publications.clear()
}
@ -485,7 +549,7 @@ internal object ServerHandshakePollers {
override fun close() {
delegate.close()
handler.clear()
driver.close()
driver.close(server)
logger.info("Closed IPC poller")
}
@ -534,7 +598,7 @@ internal object ServerHandshakePollers {
override fun close() {
delegate.close()
handler.clear()
driver.close()
driver.close(server)
logger.info("Closed IPv4 poller")
}
@ -581,7 +645,7 @@ internal object ServerHandshakePollers {
override fun close() {
delegate.close()
handler.clear()
driver.close()
driver.close(server)
logger.info("Closed IPv4 poller")
}
@ -629,7 +693,7 @@ internal object ServerHandshakePollers {
override fun close() {
delegate.close()
handler.clear()
driver.close()
driver.close(server)
logger.info("Closed IPv4+6 poller")
}