diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index e3390cfd..0863192f 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -11,6 +11,9 @@ import io.aeron.Publication import io.aeron.Subscription import io.aeron.driver.MediaDriver import io.aeron.samples.SamplesUtil +import kotlinx.coroutines.delay +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import mu.KLogger import mu.KotlinLogging import org.agrona.DirectBuffer @@ -21,9 +24,6 @@ import org.agrona.concurrent.status.CountersReader import org.slf4j.LoggerFactory import java.io.File import java.lang.Thread.sleep -import java.util.concurrent.locks.* -import kotlin.concurrent.read -import kotlin.concurrent.write /** * Class for managing the Aeron+Media drivers @@ -61,7 +61,7 @@ class AeronDriver( private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS // prevents multiple instances, within the same JVM, from starting at the exact same time. - private val lock = ReentrantReadWriteLock() + private val mutex = Mutex() private fun setConfigDefaults(config: Configuration, logger: KLogger) { // explicitly don't set defaults if we already have the context defined! @@ -234,8 +234,8 @@ class AeronDriver( } } - private fun isLoaded(): Boolean { - return lock.read { + private suspend fun isLoaded(): Boolean { + return mutex.withLock { val mediaDriverLoaded = mediaDriverWasAlreadyRunning || mediaDriver != null mediaDriverLoaded && aeron != null && aeron?.isClosed == false } @@ -246,12 +246,12 @@ class AeronDriver( * * @return true if we are successfully connected to the aeron client */ - fun start(): Boolean { + suspend fun start(): Boolean { if (isLoaded()) { return true } - lock.write { + mutex.withLock { if (!mediaDriverWasAlreadyRunning && mediaDriver == null) { // only start if we didn't already start... There will be several checks. @@ -261,7 +261,7 @@ class AeronDriver( // SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to // that instance logger.debug { "Aeron Media driver already running. Double checking status..." } - sleep(context.driverTimeout/2) + delay(context.driverTimeout/2) running = isRunning() } @@ -277,7 +277,7 @@ class AeronDriver( break } catch (e: Exception) { logger.warn(e) { "Unable to start the Aeron Media driver at ${context.driverDirectory}. Retrying $count more times..." } - sleep(context.driverTimeout) + delay(context.driverTimeout) } } } else { @@ -325,7 +325,7 @@ class AeronDriver( return true } - fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication { + suspend fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication { val uri = publicationUri.build() // reasons we cannot add a pub/sub to aeron @@ -339,7 +339,7 @@ class AeronDriver( // in the client, if we are unable to connect to the server, we will attempt to start the media driver + connect to aeron - lock.read { + mutex.withLock { val aeron1 = aeron if (aeron1 == null || aeron1.isClosed) { // there was an error connecting to the aeron client or media driver. @@ -360,7 +360,7 @@ class AeronDriver( } } - fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription { + suspend fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription { val uri = subscriptionUri.build() // reasons we cannot add a pub/sub to aeron @@ -377,7 +377,7 @@ class AeronDriver( // subscriptions do not depend on a response from the remote endpoint, and should always succeed if aeron is available - return lock.read { + return mutex.withLock { val aeron1 = aeron if (aeron1 == null || aeron1.isClosed) { // there was an error connecting to the aeron client or media driver. @@ -413,8 +413,8 @@ class AeronDriver( * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). */ - fun close() { - lock.write { + suspend fun close() { + mutex.withLock { try { aeron?.close() } catch (e: Exception) { diff --git a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt index a4ea0cd9..3b3e1b26 100644 --- a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt @@ -131,7 +131,7 @@ internal open class IpcMediaDriverConnection( * * serverAddress is ignored for IPC */ - override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. val publicationUri = uri() diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index 0229ec2e..505ced65 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -31,7 +31,7 @@ abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionP abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) - abstract fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false) + abstract suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false) abstract val clientInfo : String abstract val serverInfo : String diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index b403a40d..e92a6a25 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -168,7 +168,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } } - override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { throw ClientException("Server info not implemented in Client MediaDriver Connection") } override val serverInfo: String diff --git a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt index f97d957a..8cb46565 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt @@ -66,7 +66,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres throw ServerException("Client info not implemented in Server MediaDriver Connection") } - override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { val connectionString = aeronConnectionString(listenAddress) // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 7d31adef..3bbe357a 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -448,7 +448,7 @@ internal constructor(val type: Class<*>, // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! val message = handshakeKryo.read(buffer, offset, length) as HandshakeMessage - logger.trace { "[${message.connectKey}] received HS: $message" } + logger.trace { "[${message.connectKey}] received HS: $message (Might not be for this connection)" } message } catch (e: Exception) { @@ -732,9 +732,10 @@ internal constructor(val type: Class<*>, final override fun close() { if (shutdown.compareAndSet(expect = false, update = true)) { logger.info { "Shutting down..." } - aeronDriver.close() runBlocking { + aeronDriver.close() + // the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() val enableRemove = type == Client::class.java diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index cad7e2c1..fda1e902 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -201,7 +201,7 @@ internal class ServerHandshake(private val logger: KLog // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD - fun processIpcHandshakeMessageServer( + suspend fun processIpcHandshakeMessageServer( server: Server, handshakePublication: Publication, message: HandshakeMessage, @@ -336,7 +336,7 @@ internal class ServerHandshake(private val logger: KLog } // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD - fun processUdpHandshakeMessageServer( + suspend fun processUdpHandshakeMessageServer( server: Server, handshakePublication: Publication, remoteIpAndPort: String, diff --git a/src/dorkbox/network/handshake/ServerHandshakePollers.kt b/src/dorkbox/network/handshake/ServerHandshakePollers.kt index 66c46458..4fd42683 100644 --- a/src/dorkbox/network/handshake/ServerHandshakePollers.kt +++ b/src/dorkbox/network/handshake/ServerHandshakePollers.kt @@ -12,6 +12,7 @@ import dorkbox.network.connection.Connection import io.aeron.FragmentAssembler import io.aeron.Image import io.aeron.logbuffer.Header +import kotlinx.coroutines.runBlocking import org.agrona.DirectBuffer internal object ServerHandshakePollers { @@ -23,7 +24,11 @@ internal object ServerHandshakePollers { } } - fun IPC(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server): AeronPoller { + suspend fun IPC( + aeronDriver: AeronDriver, + config: ServerConfiguration, + server: Server): AeronPoller + { val logger = server.logger val connectionFunc = server.connectionFunc val handshake = server.handshake @@ -61,9 +66,11 @@ internal object ServerHandshakePollers { return@FragmentAssembler } - handshake.processIpcHandshakeMessageServer( - server, publication, message, aeronDriver, connectionFunc, logger - ) + runBlocking { + handshake.processIpcHandshakeMessageServer( + server, publication, message, aeronDriver, connectionFunc, logger + ) + } } override fun poll(): Int { @@ -86,7 +93,11 @@ internal object ServerHandshakePollers { - fun ip4(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server): AeronPoller { + suspend fun ip4( + aeronDriver: AeronDriver, + config: ServerConfiguration, + server: Server): AeronPoller + { val logger = server.logger val connectionFunc = server.connectionFunc val handshake = server.handshake @@ -146,9 +157,11 @@ internal object ServerHandshakePollers { return@FragmentAssembler } - handshake.processUdpHandshakeMessageServer( - server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger - ) + runBlocking { + handshake.processUdpHandshakeMessageServer( + server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger + ) + } } override fun poll(): Int { @@ -169,7 +182,11 @@ internal object ServerHandshakePollers { return poller } - fun ip6(aeronDriver: AeronDriver, config: ServerConfiguration, server: Server): AeronPoller { + suspend fun ip6( + aeronDriver: AeronDriver, + config: ServerConfiguration, + server: Server): AeronPoller + { val logger = server.logger val connectionFunc = server.connectionFunc val handshake = server.handshake @@ -227,9 +244,11 @@ internal object ServerHandshakePollers { return@FragmentAssembler } - handshake.processUdpHandshakeMessageServer( - server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger - ) + runBlocking { + handshake.processUdpHandshakeMessageServer( + server, publication, remoteIpAndPort, message, aeronDriver, false, connectionFunc, logger + ) + } } override fun poll(): Int { @@ -250,7 +269,7 @@ internal object ServerHandshakePollers { return poller } - fun ip6Wildcard( + suspend fun ip6Wildcard( aeronDriver: AeronDriver, config: ServerConfiguration, server: Server @@ -312,9 +331,11 @@ internal object ServerHandshakePollers { return@FragmentAssembler } - handshake.processUdpHandshakeMessageServer( - server, publication, remoteIpAndPort, message, aeronDriver, true, connectionFunc, logger - ) + runBlocking { + handshake.processUdpHandshakeMessageServer( + server, publication, remoteIpAndPort, message, aeronDriver, true, connectionFunc, logger + ) + } } override fun poll(): Int {