Better insight when specific client/server connection errors occur.

This commit is contained in:
Robinson 2022-04-04 16:30:05 +02:00
parent a0664f05b1
commit 533aec6c2d
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 56 additions and 53 deletions

View File

@ -34,6 +34,7 @@ import dorkbox.network.connection.eventLoop
import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRejectedException import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.handshake.ClientHandshake import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping import dorkbox.network.ping.Ping
@ -68,7 +69,7 @@ open class Client<CONNECTION : Connection>(
/** /**
* Gets the version number. * Gets the version number.
*/ */
const val version = "5.9.2" const val version = "5.10"
/** /**
* Checks to see if a client (using the specified configuration) is running. * Checks to see if a client (using the specified configuration) is running.
@ -334,7 +335,7 @@ open class Client<CONNECTION : Connection>(
// once we're done with the connection process, stop trying // once we're done with the connection process, stop trying
break break
} catch (e: ClientException) { } catch (e: ClientRetryException) {
handshake.reset() handshake.reset()
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it" // short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
@ -346,7 +347,8 @@ open class Client<CONNECTION : Connection>(
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Un-recoverable error. Aborting.", e) logger.error("Un-recoverable error during handshake. Aborting.", e)
listenerManager.notifyError(e)
throw e throw e
} }
} }
@ -416,13 +418,7 @@ open class Client<CONNECTION : Connection>(
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class) // throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
val connectionInfo = try { val connectionInfo = handshake.hello(handshakeConnection, connectionTimeoutSec)
handshake.hello(handshakeConnection, connectionTimeoutSec)
} catch (e: Exception) {
logger.error("Handshake error", e)
throw e
}
// VALIDATE:: check to see if the remote connection's public key has changed! // VALIDATE:: check to see if the remote connection's public key has changed!
val validateRemoteAddress = if (isUsingIPC) { val validateRemoteAddress = if (isUsingIPC) {
@ -489,8 +485,6 @@ open class Client<CONNECTION : Connection>(
} else { } else {
ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!") ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!")
} }
logger.error("Initialization error", exception)
throw exception throw exception
} }
@ -571,7 +565,7 @@ open class Client<CONNECTION : Connection>(
// SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them // SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them
// these have to be in two SEPARATE actionDispatch.launch commands.... otherwise... // these have to be in two SEPARATE actionDispatch.launch commands.... otherwise...
// if something inside of notifyConnect is blocking or suspends, then polling will never happen! // if something inside-of notifyConnect is blocking or suspends, then polling will never happen!
actionDispatch.launch { actionDispatch.launch {
waiter.doNotify() waiter.doNotify()
@ -614,7 +608,6 @@ open class Client<CONNECTION : Connection>(
val exception = ClientRejectedException("Unable to connect with server ${handshakeConnection.clientInfo()}") val exception = ClientRejectedException("Unable to connect with server ${handshakeConnection.clientInfo()}")
ListenerManager.cleanStackTrace(exception) ListenerManager.cleanStackTrace(exception)
logger.error("Connection ${connection.id}", exception)
throw exception throw exception
} }
} }

View File

@ -0,0 +1,21 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.exceptions
/**
* The server rejected this client when it tried to connect, but it should retry
*/
open class ClientRetryException(message: String, cause: Throwable? = null) : ClientException(message, cause)

View File

@ -18,18 +18,5 @@ package dorkbox.network.exceptions
/** /**
* The client timed out when it attempted to connect to the server. * The client timed out when it attempted to connect to the server.
*/ */
class ClientTimedOutException : ClientException { class ClientTimedOutException(message: String, cause: Throwable? = null) : ClientRetryException(message, cause) {
/**
* Create an exception.
*
* @param message The message
*/
constructor(message: String) : super(message)
/**
* Create an exception.
*
* @param cause The cause
*/
constructor(cause: Throwable) : super(cause)
} }

View File

@ -19,8 +19,10 @@ import dorkbox.network.Client
import dorkbox.network.aeron.MediaDriverConnection import dorkbox.network.aeron.MediaDriverConnection
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement import dorkbox.network.connection.CryptoManagement
import dorkbox.network.exceptions.ClientException import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.exceptions.ServerException
import io.aeron.FragmentAssembler import io.aeron.FragmentAssembler
import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header import io.aeron.logbuffer.Header
@ -54,10 +56,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
private var needToRetry = false private var needToRetry = false
@Volatile @Volatile
private var failedMessage: String = "" private var failedException: Exception? = null
@Volatile
private var failed: Boolean = true
init { init {
// now we have a bi-directional connection with the server on the handshake "socket". // now we have a bi-directional connection with the server on the handshake "socket".
@ -67,17 +66,19 @@ internal class ClientHandshake<CONNECTION: Connection>(
val message = endPoint.readHandshakeMessage(buffer, offset, length, header) val message = endPoint.readHandshakeMessage(buffer, offset, length, header)
val sessionId = header.sessionId() val sessionId = header.sessionId()
failedException = null
needToRetry = false
// it must be a registration message // it must be a registration message
if (message !is HandshakeMessage) { if (message !is HandshakeMessage) {
failedMessage = "[$sessionId] cancelled handshake for unrecognized message: $message" failedException = ClientRejectedException("[$sessionId] cancelled handshake for unrecognized message: $message")
failed = true
return@FragmentAssembler return@FragmentAssembler
} }
// this is an error message // this is an error message
if (message.state == HandshakeMessage.INVALID) { if (message.state == HandshakeMessage.INVALID) {
failedMessage = "[$sessionId] cancelled handshake for error: ${message.errorMessage}" val cause = ServerException(message.errorMessage ?: "Unknown").apply { stackTrace = stackTrace.copyOfRange(0, 1) }
failed = true failedException = ClientRejectedException("[$sessionId] cancelled handshake", cause)
return@FragmentAssembler return@FragmentAssembler
} }
@ -105,8 +106,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
if (registrationData != null && serverPublicKeyBytes != null) { if (registrationData != null && serverPublicKeyBytes != null) {
connectionHelloInfo = crypto.decrypt(registrationData, serverPublicKeyBytes) connectionHelloInfo = crypto.decrypt(registrationData, serverPublicKeyBytes)
} else { } else {
failedMessage = "[$message.sessionId] canceled handshake for message without registration and/or public key info" failedException = ClientRejectedException("[$message.sessionId] canceled handshake for message without registration and/or public key info")
failed = true
} }
} }
HandshakeMessage.HELLO_ACK_IPC -> { HandshakeMessage.HELLO_ACK_IPC -> {
@ -129,16 +129,15 @@ internal class ClientHandshake<CONNECTION: Connection>(
publicationPort = streamPubId, publicationPort = streamPubId,
kryoRegistrationDetails = regDetails) kryoRegistrationDetails = regDetails)
} else { } else {
failedMessage = "[$message.sessionId] canceled handshake for message without registration data" failedException = ClientRejectedException("[$message.sessionId] canceled handshake for message without registration data")
failed = true
} }
} }
HandshakeMessage.DONE_ACK -> { HandshakeMessage.DONE_ACK -> {
connectionDone = true connectionDone = true
} }
else -> { else -> {
failedMessage = "[$sessionId] cancelled handshake for message that is ${HandshakeMessage.toStateString(message.state)}" val stateString = HandshakeMessage.toStateString(message.state)
failed = true failedException = ClientRejectedException("[$sessionId] cancelled handshake for message that is $stateString")
} }
} }
} }
@ -146,7 +145,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
// called from the connect thread // called from the connect thread
fun hello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo { fun hello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
failed = false failedException = null
oneTimeKey = endPoint.crypto.secureRandom.nextInt() oneTimeKey = endPoint.crypto.secureRandom.nextInt()
val publicKey = endPoint.storage.getPublicKey()!! val publicKey = endPoint.storage.getPublicKey()!!
@ -172,7 +171,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1) pollCount = subscription.poll(handler, 1)
if (failed || connectionHelloInfo != null) { if (failedException != null || connectionHelloInfo != null) {
break break
} }
@ -180,10 +179,12 @@ internal class ClientHandshake<CONNECTION: Connection>(
pollIdleStrategy.idle(pollCount) pollIdleStrategy.idle(pollCount)
} }
if (failed) { val failedEx = failedException
if (failedEx != null) {
// no longer necessary to hold this connection open (if not a failure, we close the handshake after the DONE message) // no longer necessary to hold this connection open (if not a failure, we close the handshake after the DONE message)
handshakeConnection.close() handshakeConnection.close()
throw ClientException(failedMessage) ListenerManager.cleanStackTraceInternal(failedEx)
throw failedEx
} }
if (connectionHelloInfo == null) { if (connectionHelloInfo == null) {
// no longer necessary to hold this connection open (if not a failure, we close the handshake after the DONE message) // no longer necessary to hold this connection open (if not a failure, we close the handshake after the DONE message)
@ -208,7 +209,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
// block until we receive the connection information from the server // block until we receive the connection information from the server
failed = false failedException = null
var pollCount: Int var pollCount: Int
val subscription = handshakeConnection.subscription val subscription = handshakeConnection.subscription
val pollIdleStrategy = endPoint.pollIdleStrategyHandShake val pollIdleStrategy = endPoint.pollIdleStrategyHandShake
@ -220,7 +221,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
pollCount = subscription.poll(handler, 1) pollCount = subscription.poll(handler, 1)
if (failed || connectionDone) { if (failedException != null || connectionDone) {
break break
} }
@ -240,9 +241,11 @@ internal class ClientHandshake<CONNECTION: Connection>(
// finished with the handshake, so always close the connection // finished with the handshake, so always close the connection
handshakeConnection.close() handshakeConnection.close()
if (failed) { val failedEx = failedException
throw ClientException(failedMessage) if (failedEx != null) {
throw failedEx
} }
if (!connectionDone) { if (!connectionDone) {
throw ClientTimedOutException("Waiting for registration response from server") throw ClientTimedOutException("Waiting for registration response from server")
} }
@ -255,7 +258,6 @@ internal class ClientHandshake<CONNECTION: Connection>(
connectionHelloInfo = null connectionHelloInfo = null
connectionDone = false connectionDone = false
needToRetry = false needToRetry = false
failedMessage = "" failedException = null
failed = true
} }
} }