From 63f60a5f9601b8d0a4a5ec2f22e0a5fbee700282 Mon Sep 17 00:00:00 2001 From: Robinson Date: Tue, 21 Feb 2023 19:36:25 +0100 Subject: [PATCH] When closing a connection, we use the writeMutex instead of 'messagesInProgress' --- src/dorkbox/network/connection/Connection.kt | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 9c8ae515..4cf4e4c4 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -135,9 +135,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { */ val rmi: RmiSupportConnection - // a record of how many messages are in progress of being sent. When closing the connection, this number must be 0 - private val messagesInProgress = atomic(0) - // we customize the toString() value for this connection, and it's just better to cache it's value (since it's a modestly complex string) private val toString0: String @@ -227,8 +224,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! */ suspend fun send(message: Any): Boolean { - messagesInProgress.getAndIncrement() - // we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time. // NOTE: additionally we want to propagate back-pressure to the calling coroutines, PER CONNECTION! @@ -258,8 +253,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { } false - } finally { - messagesInProgress.getAndDecrement() } } } @@ -283,19 +276,10 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * * @return true if the message was successfully sent by aeron */ - suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean { + suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit = {}): Boolean { return endPoint.ping(this, pingTimeoutSeconds, function) } - /** - * A message in progress means that we have requested to send an object over the network, but it hasn't finished sending over the network - * - * @return the number of messages in progress for this connection. - */ - fun messagesInProgress(): Int { - return messagesInProgress.value - } - /** * Adds a function that will be called when a client/server "disconnects" with * each other @@ -413,7 +397,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything // we want a timeout-check, otherwise this will run forever - while (messagesInProgress.value != 0 && System.nanoTime() - closeTimeoutTime < timoutInNanos) { + while (writeMutex.isLocked && System.nanoTime() - closeTimeoutTime < timoutInNanos) { sleep(50) }