Added sub/pub retries for both UDP and IPC
This commit is contained in:
parent
a82c5724f5
commit
d7eef22091
|
@ -44,6 +44,40 @@ interface MediaDriverConnection : AutoCloseable {
|
||||||
|
|
||||||
val isReliable: Boolean
|
val isReliable: Boolean
|
||||||
|
|
||||||
|
suspend fun addSubscriptionWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Subscription {
|
||||||
|
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
|
||||||
|
var count = 10
|
||||||
|
var exception: Exception? = null
|
||||||
|
while (count-- > 0) {
|
||||||
|
try {
|
||||||
|
return aeron.addSubscription(uri, streamId)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
exception = e
|
||||||
|
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
||||||
|
delay(5000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw exception!!
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun addPublicationWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Publication {
|
||||||
|
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
|
||||||
|
var count = 10
|
||||||
|
var exception: Exception? = null
|
||||||
|
while (count-- > 0) {
|
||||||
|
try {
|
||||||
|
return aeron.addPublication(uri, streamId)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
exception = e
|
||||||
|
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
||||||
|
delay(5_000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw exception!!
|
||||||
|
}
|
||||||
|
|
||||||
@Throws(ClientTimedOutException::class)
|
@Throws(ClientTimedOutException::class)
|
||||||
suspend fun buildClient(aeron: Aeron, logger: KLogger)
|
suspend fun buildClient(aeron: Aeron, logger: KLogger)
|
||||||
suspend fun buildServer(aeron: Aeron, logger: KLogger)
|
suspend fun buildServer(aeron: Aeron, logger: KLogger)
|
||||||
|
@ -120,8 +154,8 @@ class UdpMediaDriverConnection(override val address: InetAddress,
|
||||||
// publication of any state to other threads and not be long running or re-entrant with the client.
|
// publication of any state to other threads and not be long running or re-entrant with the client.
|
||||||
// on close, the publication CAN linger (in case a client goes away, and then comes back)
|
// on close, the publication CAN linger (in case a client goes away, and then comes back)
|
||||||
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
||||||
val publication = aeron.addPublication(publicationUri.build(), streamId)
|
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
|
||||||
val subscription = aeron.addSubscription(subscriptionUri.build(), streamId)
|
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
|
||||||
|
|
||||||
var success = false
|
var success = false
|
||||||
|
|
||||||
|
@ -197,29 +231,9 @@ class UdpMediaDriverConnection(override val address: InetAddress,
|
||||||
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
||||||
|
|
||||||
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
|
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
|
||||||
var count = 10
|
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
|
||||||
while (count-- > 0) {
|
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
|
||||||
try {
|
|
||||||
publication = aeron.addPublication(publicationUri.build(), streamId)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5_000)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
count = 10
|
|
||||||
while (count-- > 0) {
|
|
||||||
try {
|
|
||||||
subscription = aeron.addSubscription(subscriptionUri.build(), streamId)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
override fun clientInfo(): String {
|
override fun clientInfo(): String {
|
||||||
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
|
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
|
||||||
|
@ -306,27 +320,8 @@ class IpcMediaDriverConnection(override val streamId: Int,
|
||||||
// publication of any state to other threads and not be long running or re-entrant with the client.
|
// publication of any state to other threads and not be long running or re-entrant with the client.
|
||||||
|
|
||||||
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
|
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
|
||||||
var count = 10
|
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
|
||||||
while (count-- > 0) {
|
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
|
||||||
try {
|
|
||||||
publication = aeron.addPublication(publicationUri.build(), streamId)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5_000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
count = 10
|
|
||||||
while (count-- > 0) {
|
|
||||||
try {
|
|
||||||
subscription = aeron.addSubscription(subscriptionUri.build(), streamIdSubscription)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var success = false
|
var success = false
|
||||||
|
|
||||||
|
@ -367,9 +362,8 @@ class IpcMediaDriverConnection(override val streamId: Int,
|
||||||
}
|
}
|
||||||
|
|
||||||
this.success = true
|
this.success = true
|
||||||
|
|
||||||
this.subscription = subscription
|
|
||||||
this.publication = publication
|
this.publication = publication
|
||||||
|
this.subscription = subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
|
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
|
||||||
|
@ -393,27 +387,8 @@ class IpcMediaDriverConnection(override val streamId: Int,
|
||||||
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
||||||
|
|
||||||
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
|
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
|
||||||
var count = 10
|
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
|
||||||
while (count-- > 0) {
|
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
|
||||||
try {
|
|
||||||
publication = aeron.addPublication(publicationUri.build(), streamId)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5_000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
count = 10
|
|
||||||
while (count-- > 0) {
|
|
||||||
try {
|
|
||||||
subscription = aeron.addSubscription(subscriptionUri.build(), streamIdSubscription)
|
|
||||||
break
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.warn(e) { "Unable to add a publication to Aeron. Retrying $count more times..." }
|
|
||||||
delay(5000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clientInfo() : String {
|
override fun clientInfo() : String {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user