When adding a pub/sub, guarantee that it is active and bound.

This commit is contained in:
Robinson 2023-07-01 11:34:00 +02:00
parent a41a6c6d62
commit ace2ac453b
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 61 additions and 12 deletions

View File

@ -251,6 +251,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
/**
* Add a [ConcurrentPublication] for publishing messages to subscribers.
*
* This guarantees that the publication is added and ACTIVE
*
* The publication returned is threadsafe.
*/
@Suppress("DEPRECATION")
@ -301,18 +303,35 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
registeredPublications.incrementAndGet()
var hasDelay = false
while (publication.channelStatus() != ChannelEndpointStatus.ACTIVE || publication.localSocketAddresses().isEmpty()) {
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
}
// the publication has not ACTUALLY been created yet!
delay(50)
}
if (hasDelay) {
logger.debug { "Aeron Driver [$driverId]: Delayed creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
}
registeredPublications++
if (logger.isTraceEnabled) {
registeredPublicationsTrace.add(publication.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating publication [$logInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
logger.trace { "Aeron Driver [$driverId]: Creating publication [$logInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
return publication
}
/**
* Add an [ExclusivePublication] for publishing messages to subscribers from a single thread.
*
* This guarantees that the publication is added and ACTIVE
*
* This is not a thread-safe publication!
*/
@Suppress("DEPRECATION")
@ -364,18 +383,34 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
registeredPublications.incrementAndGet()
var hasDelay = false
while (publication.channelStatus() != ChannelEndpointStatus.ACTIVE || publication.localSocketAddresses().isEmpty()) {
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
}
// the publication has not ACTUALLY been created yet!
delay(50)
}
if (hasDelay) {
logger.debug { "Aeron Driver [$driverId]: Delayed creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
}
registeredPublications++
if (logger.isTraceEnabled) {
registeredPublicationsTrace.add(publication.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $logInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $logInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
return publication
}
/**
* Add a new [Subscription] for subscribing to messages from publishers.
*
* This guarantees that the subscription is added and ACTIVE
*
* The method will set up the [Subscription] to use the
* {@link Aeron.Context#availableImageHandler(AvailableImageHandler)} and
* {@link Aeron.Context#unavailableImageHandler(UnavailableImageHandler)} from the {@link Aeron.Context}.
@ -430,12 +465,26 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
registeredSubscriptions.incrementAndGet()
var hasDelay = false
while (subscription.channelStatus() != ChannelEndpointStatus.ACTIVE || subscription.localSocketAddresses().isEmpty()) {
if (!hasDelay) {
hasDelay = true
logger.debug { "Aeron Driver [$driverId]: Delaying creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
}
// the subscription has not ACTUALLY been created yet!
delay(50)
}
if (hasDelay) {
logger.debug { "Aeron Driver [$driverId]: Delayed creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
}
registeredSubscriptions++
if (logger.isTraceEnabled) {
registeredSubscriptionsTrace.add(subscription.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}" }
logger.trace { "Aeron Driver [$driverId]: Creating subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}, channel=${subscription.channel()}" }
return subscription
}
@ -471,14 +520,14 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
if (publication is ConcurrentPublication) {
// aeron is async. close() doesn't immediately close, it just submits the close command!
// THIS CAN TAKE A WHILE TO ACTUALLY CLOSE!
while (publication.isConnected || aeron1.getPublication(registrationId) != null) {
delay(100)
while (publication.isConnected || publication.channelStatus() == ChannelEndpointStatus.ACTIVE || aeron1.getPublication(registrationId) != null) {
delay(50)
}
} else {
// aeron is async. close() doesn't immediately close, it just submits the close command!
// THIS CAN TAKE A WHILE TO ACTUALLY CLOSE!
while (publication.isConnected || aeron1.getExclusivePublication(registrationId) != null) {
delay(100)
while (publication.isConnected || publication.channelStatus() == ChannelEndpointStatus.ACTIVE || aeron1.getExclusivePublication(registrationId) != null) {
delay(50)
}
}
@ -512,8 +561,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
// aeron is async. close() doesn't immediately close, it just submits the close command!
// THIS CAN TAKE A WHILE TO ACTUALLY CLOSE!
while (subscription.isConnected || subscription.images().isNotEmpty()) {
delay(100)
while (subscription.isConnected || subscription.channelStatus() == ChannelEndpointStatus.ACTIVE || subscription.images().isNotEmpty()) {
delay(50)
}
// deleting log files is generally not recommended in a production environment as it can result in data loss and potential disruption of the messaging system!!