Added extra checks when adding pub/sub for when there is an ERRORED state

This commit is contained in:
Robinson 2023-08-07 22:30:53 -06:00
parent 296c600245
commit 9a30c031ef
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 28 additions and 0 deletions

View File

@ -313,6 +313,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
var hasDelay = false
while (publication.channelStatus() != ChannelEndpointStatus.ACTIVE || (!isIpc && publication.localSocketAddresses().isEmpty())) {
if (publication.channelStatus() == ChannelEndpointStatus.ERRORED) {
logger.error { "Aeron Driver [$driverId]: Error creating publication (has errors) $logInfo :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
// there was an error connecting to the aeron client or media driver.
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an publication")
ex.cleanAllStackTrace()
throw ex
}
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
@ -399,6 +408,16 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
var hasDelay = false
while (publication.channelStatus() != ChannelEndpointStatus.ACTIVE || (!isIpc && publication.localSocketAddresses().isEmpty())) {
if (publication.channelStatus() == ChannelEndpointStatus.ERRORED) {
logger.error { "Aeron Driver [$driverId]: Error creating ex-publication (has errors) $logInfo :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
// there was an error connecting to the aeron client or media driver.
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication")
ex.cleanAllStackTrace()
throw ex
}
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
@ -487,6 +506,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
var hasDelay = false
while (subscription.channelStatus() != ChannelEndpointStatus.ACTIVE || (!isIpc && subscription.localSocketAddresses().isEmpty())) {
if (subscription.channelStatus() == ChannelEndpointStatus.ERRORED) {
logger.error { "Aeron Driver [$driverId]: Error creating subscription (has errors) $logInfo :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
// there was an error connecting to the aeron client or media driver.
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an subscription")
ex.cleanAllStackTrace()
throw ex
}
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }