When closing a connection, we use the writeMutex instead of 'messagesInProgress'

This commit is contained in:
Robinson 2023-02-21 19:36:25 +01:00
parent 1fd9cdd405
commit 63f60a5f96
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -135,9 +135,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*/
val rmi: RmiSupportConnection<out Connection>
// a record of how many messages are in progress of being sent. When closing the connection, this number must be 0
private val messagesInProgress = atomic(0)
// we customize the toString() value for this connection, and it's just better to cache it's value (since it's a modestly complex string)
private val toString0: String
@ -227,8 +224,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/
suspend fun send(message: Any): Boolean {
messagesInProgress.getAndIncrement()
// we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time.
// NOTE: additionally we want to propagate back-pressure to the calling coroutines, PER CONNECTION!
@ -258,8 +253,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
false
} finally {
messagesInProgress.getAndDecrement()
}
}
}
@ -283,19 +276,10 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @return true if the message was successfully sent by aeron
*/
suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean {
suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit = {}): Boolean {
return endPoint.ping(this, pingTimeoutSeconds, function)
}
/**
* A message in progress means that we have requested to send an object over the network, but it hasn't finished sending over the network
*
* @return the number of messages in progress for this connection.
*/
fun messagesInProgress(): Int {
return messagesInProgress.value
}
/**
* Adds a function that will be called when a client/server "disconnects" with
* each other
@ -413,7 +397,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything
// we want a timeout-check, otherwise this will run forever
while (messagesInProgress.value != 0 && System.nanoTime() - closeTimeoutTime < timoutInNanos) {
while (writeMutex.isLocked && System.nanoTime() - closeTimeoutTime < timoutInNanos) {
sleep(50)
}