Added a way to get messages in progress for a connection

This commit is contained in:
nathan 2020-08-26 16:24:17 +02:00
parent 31a32d303a
commit 986585f073

View File

@ -33,8 +33,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import javax.crypto.SecretKey
/** /**
* This connection is established once the registration information is validated, and the various connect/filter checks have passed * This connection is established once the registration information is validated, and the various connect/filter checks have passed
@ -117,7 +115,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter)
// The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
private val aes_gcm_iv = AtomicLong(0) private val aes_gcm_iv = atomic(0)
// RMI support for this connection // RMI support for this connection
internal val rmiConnectionSupport = endPoint.getRmiConnectionSupport() internal val rmiConnectionSupport = endPoint.getRmiConnectionSupport()
@ -167,24 +165,24 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return endPoint return endPoint
} }
/** // /**
* This is the per-message sequence number. // * This is the per-message sequence number.
* // *
* The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) // * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
* The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this // * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
* counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) // * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
*/ // */
fun nextGcmSequence(): Long { // fun nextGcmSequence(): Long {
return aes_gcm_iv.getAndIncrement() // return aes_gcm_iv.getAndIncrement()
} // }
//
/** // /**
* @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation). // * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation).
*/ // */
fun cryptoKey(): SecretKey { // fun cryptoKey(): SecretKey {
TODO() // TODO()
// return channelWrapper.cryptoKey() //// return channelWrapper.cryptoKey()
} // }
@ -193,7 +191,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* Polls the AERON media driver subscription channel for incoming messages * Polls the AERON media driver subscription channel for incoming messages
*/ */
internal fun pollSubscriptions(): Int { internal fun pollSubscriptions(): Int {
return subscription.poll(messageHandler, 1024) return subscription.poll(messageHandler, 4)
} }
/** /**
@ -267,6 +265,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
} }
/**
*
* @return the number of messages in progress for this connection.
*
* A message in progress means that we have requested to to send an object over the network, but it hasn't finished sending over the network
*/
fun messagesInProgress(): Int {
return messagesInProgress.value
}
/** /**