Added ability for subscription to wait for a publication to connect

This commit is contained in:
Robinson 2023-07-23 01:16:23 +02:00
parent 2d8956c78c
commit 936a5e2d67
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -511,8 +511,6 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
* ESPECIALLY if it is with the same streamID * ESPECIALLY if it is with the same streamID
* *
* The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. * The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
*
* this check is in the "reconnect" logic
*/ */
suspend fun waitForConnection( suspend fun waitForConnection(
publication: Publication, publication: Publication,
@ -541,6 +539,36 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
throw exception throw exception
} }
/**
* For subscriptions, in the client we want to guarantee that the remote server has connected BACK to us!
*/
suspend fun waitForConnection(
subscription: Subscription,
handshakeTimeoutNs: Long,
logInfo: String,
onErrorHandler: suspend (Throwable) -> Exception
) {
if (subscription.isConnected) {
return
}
val startTime = System.nanoTime()
while (System.nanoTime() - startTime < handshakeTimeoutNs) {
if (subscription.isConnected && subscription.imageCount() > 0) {
return
}
delay(200L)
}
close(subscription, logInfo)
val exception = onErrorHandler(Exception("Aeron Driver [${internal.driverId}]: Subscription timed out in ${Sys.getTimePrettyFull(handshakeTimeoutNs)} while waiting for connection state: ${subscription.channel()} streamId=${subscription.streamId()}"))
exception.cleanAllStackTrace()
throw exception
}
/** /**
* Add a [ConcurrentPublication] for publishing messages to subscribers. * Add a [ConcurrentPublication] for publishing messages to subscribers.
* *