From 936a5e2d67d194401f56d646db59500ae461f30d Mon Sep 17 00:00:00 2001 From: Robinson Date: Sun, 23 Jul 2023 01:16:23 +0200 Subject: [PATCH] Added ability for subscription to wait for a publication to connect --- src/dorkbox/network/aeron/AeronDriver.kt | 32 ++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index dc298593..73239c4b 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -511,8 +511,6 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger * ESPECIALLY if it is with the same streamID * * The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - * - * this check is in the "reconnect" logic */ suspend fun waitForConnection( publication: Publication, @@ -541,6 +539,36 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger throw exception } + /** + * For subscriptions, in the client we want to guarantee that the remote server has connected BACK to us! + */ + suspend fun waitForConnection( + subscription: Subscription, + handshakeTimeoutNs: Long, + logInfo: String, + onErrorHandler: suspend (Throwable) -> Exception + ) { + if (subscription.isConnected) { + return + } + + val startTime = System.nanoTime() + + while (System.nanoTime() - startTime < handshakeTimeoutNs) { + if (subscription.isConnected && subscription.imageCount() > 0) { + return + } + + delay(200L) + } + + close(subscription, logInfo) + + val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Subscription timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${subscription.channel()} streamId=${subscription.streamId()}")) + exception.cleanAllStackTrace() + throw exception + } + /** * Add a [ConcurrentPublication] for publishing messages to subscribers. *