AeronDriver now uses suspend instead of blocking

This commit is contained in:
Robinson 2022-07-16 10:24:45 -04:00
parent c30e2cde4c
commit 440512aedc
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
8 changed files with 62 additions and 40 deletions

View File

@ -11,6 +11,9 @@ import io.aeron.Publication
import io.aeron.Subscription
import io.aeron.driver.MediaDriver
import io.aeron.samples.SamplesUtil
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
@ -21,9 +24,6 @@ import org.agrona.concurrent.status.CountersReader
import org.slf4j.LoggerFactory
import java.io.File
import java.lang.Thread.sleep
import java.util.concurrent.locks.*
import kotlin.concurrent.read
import kotlin.concurrent.write
/**
* Class for managing the Aeron+Media drivers
@ -61,7 +61,7 @@ class AeronDriver(
private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS
// prevents multiple instances, within the same JVM, from starting at the exact same time.
private val lock = ReentrantReadWriteLock()
private val mutex = Mutex()
private fun setConfigDefaults(config: Configuration, logger: KLogger) {
// explicitly don't set defaults if we already have the context defined!
@ -234,8 +234,8 @@ class AeronDriver(
}
}
private fun isLoaded(): Boolean {
return lock.read {
private suspend fun isLoaded(): Boolean {
return mutex.withLock {
val mediaDriverLoaded = mediaDriverWasAlreadyRunning || mediaDriver != null
mediaDriverLoaded && aeron != null && aeron?.isClosed == false
}
@ -246,12 +246,12 @@ class AeronDriver(
*
* @return true if we are successfully connected to the aeron client
*/
fun start(): Boolean {
suspend fun start(): Boolean {
if (isLoaded()) {
return true
}
lock.write {
mutex.withLock {
if (!mediaDriverWasAlreadyRunning && mediaDriver == null) {
// only start if we didn't already start... There will be several checks.
@ -261,7 +261,7 @@ class AeronDriver(
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
// that instance
logger.debug { "Aeron Media driver already running. Double checking status..." }
sleep(context.driverTimeout/2)
delay(context.driverTimeout/2)
running = isRunning()
}
@ -277,7 +277,7 @@ class AeronDriver(
break
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver at ${context.driverDirectory}. Retrying $count more times..." }
sleep(context.driverTimeout)
delay(context.driverTimeout)
}
}
} else {
@ -325,7 +325,7 @@ class AeronDriver(
return true
}
fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
suspend fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
val uri = publicationUri.build()
// reasons we cannot add a pub/sub to aeron
@ -339,7 +339,7 @@ class AeronDriver(
// in the client, if we are unable to connect to the server, we will attempt to start the media driver + connect to aeron
lock.read {
mutex.withLock {
val aeron1 = aeron
if (aeron1 == null || aeron1.isClosed) {
// there was an error connecting to the aeron client or media driver.
@ -360,7 +360,7 @@ class AeronDriver(
}
}
fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
suspend fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
val uri = subscriptionUri.build()
// reasons we cannot add a pub/sub to aeron
@ -377,7 +377,7 @@ class AeronDriver(
// subscriptions do not depend on a response from the remote endpoint, and should always succeed if aeron is available
return lock.read {
return mutex.withLock {
val aeron1 = aeron
if (aeron1 == null || aeron1.isClosed) {
// there was an error connecting to the aeron client or media driver.
@ -413,8 +413,8 @@ class AeronDriver(
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
fun close() {
lock.write {
suspend fun close() {
mutex.withLock {
try {
aeron?.close()
} catch (e: Exception) {

View File

@ -131,7 +131,7 @@ internal open class IpcMediaDriverConnection(
*
* serverAddress is ignored for IPC
*/
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()

View File

@ -31,7 +31,7 @@ abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionP
abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
abstract suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
abstract val clientInfo : String
abstract val serverInfo : String

View File

@ -168,7 +168,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
}
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
throw ClientException("Server info not implemented in Client MediaDriver Connection")
}
override val serverInfo: String

View File

@ -66,7 +66,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
throw ServerException("Client info not implemented in Server MediaDriver Connection")
}
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
val connectionString = aeronConnectionString(listenAddress)
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.

View File

@ -448,7 +448,7 @@ internal constructor(val type: Class<*>,
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = handshakeKryo.read(buffer, offset, length) as HandshakeMessage
logger.trace { "[${message.connectKey}] received HS: $message" }
logger.trace { "[${message.connectKey}] received HS: $message (Might not be for this connection)" }
message
} catch (e: Exception) {
@ -732,9 +732,10 @@ internal constructor(val type: Class<*>,
final override fun close() {
if (shutdown.compareAndSet(expect = false, update = true)) {
logger.info { "Shutting down..." }
aeronDriver.close()
runBlocking {
aeronDriver.close()
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
val enableRemove = type == Client::class.java

View File

@ -201,7 +201,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
fun processIpcHandshakeMessageServer(
suspend fun processIpcHandshakeMessageServer(
server: Server<CONNECTION>,
handshakePublication: Publication,
message: HandshakeMessage,
@ -336,7 +336,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
fun processUdpHandshakeMessageServer(
suspend fun processUdpHandshakeMessageServer(
server: Server<CONNECTION>,
handshakePublication: Publication,
remoteIpAndPort: String,

View File

@ -12,6 +12,7 @@ import dorkbox.network.connection.Connection
import io.aeron.FragmentAssembler
import io.aeron.Image
import io.aeron.logbuffer.Header
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer
internal object ServerHandshakePollers {
@ -23,7 +24,11 @@ internal object ServerHandshakePollers {
}
}
fun <CONNECTION : Connection> IPC(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server<CONNECTION>): AeronPoller {
suspend fun <CONNECTION : Connection> IPC(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
{
val logger = server.logger
val connectionFunc = server.connectionFunc
val handshake = server.handshake
@ -61,9 +66,11 @@ internal object ServerHandshakePollers {
return@FragmentAssembler
}
handshake.processIpcHandshakeMessageServer(
server, publication, message, aeronDriver, connectionFunc, logger
)
runBlocking {
handshake.processIpcHandshakeMessageServer(
server, publication, message, aeronDriver, connectionFunc, logger
)
}
}
override fun poll(): Int {
@ -86,7 +93,11 @@ internal object ServerHandshakePollers {
fun <CONNECTION : Connection> ip4(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server<CONNECTION>): AeronPoller {
suspend fun <CONNECTION : Connection> ip4(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
{
val logger = server.logger
val connectionFunc = server.connectionFunc
val handshake = server.handshake
@ -146,9 +157,11 @@ internal object ServerHandshakePollers {
return@FragmentAssembler
}
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger
)
runBlocking {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger
)
}
}
override fun poll(): Int {
@ -169,7 +182,11 @@ internal object ServerHandshakePollers {
return poller
}
fun <CONNECTION : Connection> ip6(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server<CONNECTION>): AeronPoller {
suspend fun <CONNECTION : Connection> ip6(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
{
val logger = server.logger
val connectionFunc = server.connectionFunc
val handshake = server.handshake
@ -227,9 +244,11 @@ internal object ServerHandshakePollers {
return@FragmentAssembler
}
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger
)
runBlocking {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger
)
}
}
override fun poll(): Int {
@ -250,7 +269,7 @@ internal object ServerHandshakePollers {
return poller
}
fun <CONNECTION : Connection> ip6Wildcard(
suspend fun <CONNECTION : Connection> ip6Wildcard(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>
@ -312,9 +331,11 @@ internal object ServerHandshakePollers {
return@FragmentAssembler
}
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, true, connectionFunc, logger
)
runBlocking {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message, aeronDriver, true, connectionFunc, logger
)
}
}
override fun poll(): Int {