Fixed issue with multi-client handshakes. Made error messages cleaner
This commit is contained in:
parent
5c7a2b5d2d
commit
62dff03df9
@ -37,6 +37,16 @@ internal class ListenerManager<CONNECTION: Connection> {
|
|||||||
@Property
|
@Property
|
||||||
val LOAD_FACTOR = 0.8f
|
val LOAD_FACTOR = 0.8f
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove from the stacktrace EVERYTHING except the message. This is for propagating internal errors
|
||||||
|
*
|
||||||
|
* Neither of these are useful in resolving exception handling from a users perspective, and only clutter the stacktrace.
|
||||||
|
*/
|
||||||
|
fun noStackTrace(throwable: Throwable) {
|
||||||
|
// keep just one, since it's a stack frame INSIDE our network library, and we need that!
|
||||||
|
throwable.stackTrace = throwable.stackTrace.copyOfRange(0, 1)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove from the stacktrace (going in reverse), kotlin coroutine info + dorkbox network call stack.
|
* Remove from the stacktrace (going in reverse), kotlin coroutine info + dorkbox network call stack.
|
||||||
*
|
*
|
||||||
|
@ -19,6 +19,7 @@ import dorkbox.network.aeron.MediaDriverConnection
|
|||||||
import dorkbox.network.connection.Connection
|
import dorkbox.network.connection.Connection
|
||||||
import dorkbox.network.connection.CryptoManagement
|
import dorkbox.network.connection.CryptoManagement
|
||||||
import dorkbox.network.connection.EndPoint
|
import dorkbox.network.connection.EndPoint
|
||||||
|
import dorkbox.network.connection.ListenerManager
|
||||||
import dorkbox.network.exceptions.ClientException
|
import dorkbox.network.exceptions.ClientException
|
||||||
import dorkbox.network.exceptions.ClientTimedOutException
|
import dorkbox.network.exceptions.ClientTimedOutException
|
||||||
import io.aeron.FragmentAssembler
|
import io.aeron.FragmentAssembler
|
||||||
@ -55,18 +56,21 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
// this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe!
|
// this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe!
|
||||||
|
|
||||||
val message = endPoint.readHandshakeMessage(buffer, offset, length, header)
|
val message = endPoint.readHandshakeMessage(buffer, offset, length, header)
|
||||||
|
|
||||||
val sessionId = header.sessionId()
|
val sessionId = header.sessionId()
|
||||||
|
|
||||||
// it must be a registration message
|
// it must be a registration message
|
||||||
if (message !is HandshakeMessage) {
|
if (message !is HandshakeMessage) {
|
||||||
failed = ClientException("[$sessionId] server returned unrecognized message: $message")
|
val exception = ClientException("[$sessionId] cancelled handshake for unrecognized message: $message")
|
||||||
|
ListenerManager.noStackTrace(exception)
|
||||||
|
failed = exception
|
||||||
return@FragmentAssembler
|
return@FragmentAssembler
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is an error message
|
// this is an error message
|
||||||
if (message.state == HandshakeMessage.INVALID) {
|
if (message.state == HandshakeMessage.INVALID) {
|
||||||
failed = ClientException("[$sessionId] error: ${message.errorMessage}")
|
val exception = ClientException("[$sessionId] cancelled handshake for error: ${message.errorMessage}")
|
||||||
|
ListenerManager.noStackTrace(exception)
|
||||||
|
failed = exception
|
||||||
return@FragmentAssembler
|
return@FragmentAssembler
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,13 +82,14 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (oneTimeKey != message.oneTimeKey) {
|
if (oneTimeKey != message.oneTimeKey) {
|
||||||
failed = ClientException("[$message.sessionId] ignored message (one-time key: ${message.oneTimeKey}) intended for another client (mine is: ${oneTimeKey})")
|
val exception = ClientException("[$message.sessionId] ignored message (one-time key: ${message.oneTimeKey}) intended for another client (mine is: ${oneTimeKey})")
|
||||||
|
ListenerManager.noStackTrace(exception)
|
||||||
|
endPoint.listenerManager.notifyError(exception)
|
||||||
return@FragmentAssembler
|
return@FragmentAssembler
|
||||||
}
|
}
|
||||||
|
|
||||||
// it must be the correct state
|
// it must be the correct state
|
||||||
val registrationData = message.registrationData
|
val registrationData = message.registrationData
|
||||||
|
|
||||||
when (message.state) {
|
when (message.state) {
|
||||||
HandshakeMessage.HELLO_ACK -> {
|
HandshakeMessage.HELLO_ACK -> {
|
||||||
// The message was intended for this client. Try to parse it as one of the available message types.
|
// The message was intended for this client. Try to parse it as one of the available message types.
|
||||||
@ -94,7 +99,9 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
if (registrationData != null && serverPublicKeyBytes != null) {
|
if (registrationData != null && serverPublicKeyBytes != null) {
|
||||||
connectionHelloInfo = crypto.decrypt(registrationData, serverPublicKeyBytes)
|
connectionHelloInfo = crypto.decrypt(registrationData, serverPublicKeyBytes)
|
||||||
} else {
|
} else {
|
||||||
failed = ClientException("[$message.sessionId] ignored message without registration and/or public key info")
|
val exception = ClientException("[$message.sessionId] canceled handshake for message without registration and/or public key info")
|
||||||
|
ListenerManager.noStackTrace(exception)
|
||||||
|
failed = exception
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
HandshakeMessage.HELLO_ACK_IPC -> {
|
HandshakeMessage.HELLO_ACK_IPC -> {
|
||||||
@ -119,7 +126,9 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
connectionDone = true
|
connectionDone = true
|
||||||
}
|
}
|
||||||
else -> {
|
else -> {
|
||||||
failed = ClientException("[$sessionId] ignored message that is ${HandshakeMessage.toStateString(message.state)}")
|
val exception = ClientException("[$sessionId] cancelled handshake for message that is ${HandshakeMessage.toStateString(message.state)}")
|
||||||
|
ListenerManager.noStackTrace(exception)
|
||||||
|
failed = exception
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -127,6 +136,7 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
|
|
||||||
// called from the connect thread
|
// called from the connect thread
|
||||||
suspend fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long) : ClientConnectionInfo {
|
suspend fun handshakeHello(handshakeConnection: MediaDriverConnection, connectionTimeoutMS: Long) : ClientConnectionInfo {
|
||||||
|
failed = null
|
||||||
oneTimeKey = endPoint.crypto.secureRandom.nextInt()
|
oneTimeKey = endPoint.crypto.secureRandom.nextInt()
|
||||||
val publicKey = endPoint.settingsStore.getPublicKey()!!
|
val publicKey = endPoint.settingsStore.getPublicKey()!!
|
||||||
|
|
||||||
@ -136,11 +146,10 @@ internal class ClientHandshake<CONNECTION: Connection>(private val crypto: Crypt
|
|||||||
val pollIdleStrategy = endPoint.config.pollIdleStrategy
|
val pollIdleStrategy = endPoint.config.pollIdleStrategy
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
endPoint.writeHandshakeMessage(publication, HandshakeMessage.helloFromClient(oneTimeKey, publicKey))
|
endPoint.writeHandshakeMessage(publication, HandshakeMessage.helloFromClient(oneTimeKey, publicKey))
|
||||||
|
|
||||||
// block until we receive the connection information from the server
|
// block until we receive the connection information from the server
|
||||||
|
|
||||||
failed = null
|
|
||||||
var pollCount: Int
|
var pollCount: Int
|
||||||
|
|
||||||
val startTime = System.currentTimeMillis()
|
val startTime = System.currentTimeMillis()
|
||||||
|
Loading…
Reference in New Issue
Block a user