diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index 0ae79aff..3f72b13f 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -44,6 +44,40 @@ interface MediaDriverConnection : AutoCloseable { val isReliable: Boolean + suspend fun addSubscriptionWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Subscription { + // If we start/stop too quickly, we might have the address already in use! Retry a few times. + var count = 10 + var exception: Exception? = null + while (count-- > 0) { + try { + return aeron.addSubscription(uri, streamId) + } catch (e: Exception) { + exception = e + logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } + delay(5000) + } + } + + throw exception!! + } + + suspend fun addPublicationWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Publication { + // If we start/stop too quickly, we might have the address already in use! Retry a few times. + var count = 10 + var exception: Exception? = null + while (count-- > 0) { + try { + return aeron.addPublication(uri, streamId) + } catch (e: Exception) { + exception = e + logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } + delay(5_000) + } + } + + throw exception!! + } + @Throws(ClientTimedOutException::class) suspend fun buildClient(aeron: Aeron, logger: KLogger) suspend fun buildServer(aeron: Aeron, logger: KLogger) @@ -120,8 +154,8 @@ class UdpMediaDriverConnection(override val address: InetAddress, // publication of any state to other threads and not be long running or re-entrant with the client. // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - val publication = aeron.addPublication(publicationUri.build(), streamId) - val subscription = aeron.addSubscription(subscriptionUri.build(), streamId) + val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) var success = false @@ -197,30 +231,10 @@ class UdpMediaDriverConnection(override val address: InetAddress, // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // If we start/stop too quickly, we might have the address already in use! Retry a few times. - var count = 10 - while (count-- > 0) { - try { - publication = aeron.addPublication(publicationUri.build(), streamId) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5_000) - } - } - - count = 10 - while (count-- > 0) { - try { - subscription = aeron.addSubscription(subscriptionUri.build(), streamId) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5000) - } - } + publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) } - override fun clientInfo(): String { return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" @@ -306,27 +320,8 @@ class IpcMediaDriverConnection(override val streamId: Int, // publication of any state to other threads and not be long running or re-entrant with the client. // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - var count = 10 - while (count-- > 0) { - try { - publication = aeron.addPublication(publicationUri.build(), streamId) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5_000) - } - } - - count = 10 - while (count-- > 0) { - try { - subscription = aeron.addSubscription(subscriptionUri.build(), streamIdSubscription) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5000) - } - } + val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) var success = false @@ -367,9 +362,8 @@ class IpcMediaDriverConnection(override val streamId: Int, } this.success = true - - this.subscription = subscription this.publication = publication + this.subscription = subscription } override suspend fun buildServer(aeron: Aeron, logger: KLogger) { @@ -393,27 +387,8 @@ class IpcMediaDriverConnection(override val streamId: Int, // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - var count = 10 - while (count-- > 0) { - try { - publication = aeron.addPublication(publicationUri.build(), streamId) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5_000) - } - } - - count = 10 - while (count-- > 0) { - try { - subscription = aeron.addSubscription(subscriptionUri.build(), streamIdSubscription) - break - } catch (e: Exception) { - logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5000) - } - } + publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) } override fun clientInfo() : String {