Cleaned up adding/removing pub/sub

This commit is contained in:
Robinson 2023-05-28 16:43:57 +02:00
parent 080e27d6ad
commit 2dd7aa8bc0
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 98 additions and 69 deletions

View File

@ -319,50 +319,38 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
}
fun addPublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication =
internal.addPublication(logger, publicationUri, aeronLogInfo, streamId)
suspend fun addPublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication {
return internal.addPublication(logger, publicationUri, aeronLogInfo, streamId)
}
/**
* This is not a thread-safe publication!
*/
fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication =
internal.addExclusivePublication(logger, publicationUri, aeronLogInfo, streamId)
suspend fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication {
return internal.addExclusivePublication(logger, publicationUri, aeronLogInfo, streamId)
}
fun addSubscription(subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription =
internal.addSubscription(logger, subscriptionUri, aeronLogInfo, streamId)
suspend fun addSubscription(subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription {
return internal.addSubscription(logger, subscriptionUri, aeronLogInfo, streamId)
}
/**
* Guarantee that the publication is closed AND the backing file is removed
*
* This can throw exceptions!
*/
suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String) =
suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String) {
internal.closeAndDeletePublication(publication, aeronLogInfo, logger)
}
/**
* Guarantee that the publication is closed AND the backing file is removed
*
* This can throw exceptions!
*/
internal suspend fun closeAndDeletePublication(connection: ClientHandshakeDriver) =
internal.closeAndDeletePublication(connection.publication, connection.logInfo, logger)
/**
* Guarantee that the publication is closed AND the backing file is removed
*
* This can throw exceptions!
*/
internal suspend fun closeAndDeleteSubscription(connection: ClientHandshakeDriver) =
internal.closeAndDeleteSubscription(connection.subscription, connection.logInfo, logger)
/**
* Guarantee that the publication is closed AND the backing file is removed
*
* This can throw exceptions!
*/
suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String) =
suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String) {
internal.closeAndDeleteSubscription(subscription, aeronLogInfo, logger)
}
/**

View File

@ -16,12 +16,12 @@
package dorkbox.network.aeron
import dorkbox.collections.LockFreeHashSet
import dorkbox.network.Configuration
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.exceptions.AeronDriverException
import dorkbox.network.exceptions.ClientRetryException
import io.aeron.Aeron
@ -38,7 +38,7 @@ import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.concurrent.BackoffIdleStrategy
import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: Configuration.MediaDriverConfig) {
companion object {
@ -102,7 +102,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
private val context: AeronContext
private val aeronErrorHandler: (Throwable) -> Unit
private val pubSubs = atomic(0)
private val registeredPublications = atomic(0)
private val registeredSubscriptions = atomic(0)
private val registeredPublicationsTrace: LockFreeHashSet<Long> = LockFreeHashSet()
private val registeredSubscriptionsTrace: LockFreeHashSet<Long> = LockFreeHashSet()
private val stateMutex = Mutex()
@ -240,7 +244,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
}
fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication {
suspend fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = stateMutex.withLock {
val uri = publicationUri.build()
// reasons we cannot add a pub/sub to aeron
@ -279,16 +283,19 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
pubSubs.incrementAndGet()
// logger.trace { "Creating publication ${publication.registrationId()}" }
logger.trace { "Creating publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
registeredPublications.incrementAndGet()
if (logger.isTraceEnabled) {
registeredPublicationsTrace.add(publication.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
return publication
}
/**
* This is not a thread-safe publication!
*/
fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication {
suspend fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = stateMutex.withLock {
val uri = publicationUri.build()
// reasons we cannot add a pub/sub to aeron
@ -314,22 +321,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
aeron1.addExclusivePublication(uri, streamId)
} catch (e: Exception) {
// this happens if the aeron media driver cannot actually establish connection... OR IF IT IS TOO FAST BETWEEN ADD AND REMOVE FOR THE SAME SESSION/STREAM ID!
Thread.sleep(10000)
try {
println()
println("HAD TO WAIT FOR PUBLICATION!")
println()
println()
aeron1.addExclusivePublication(uri, streamId)
} catch (e: Exception) {
e.cleanStackTraceInternal()
e.cause?.cleanStackTraceInternal()
val ex = ClientRetryException("Error adding a publication", e)
e.cleanAllStackTrace()
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication", e)
ex.cleanAllStackTrace()
throw ex
}
}
if (publication == null) {
// there was an error connecting to the aeron client or media driver.
@ -338,13 +334,16 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
pubSubs.incrementAndGet()
// logger.trace { "Creating ex-publication ${publication.registrationId()} :: $type e-pub URI: $uri,stream-id=$streamId" }
logger.trace { "Creating ex-publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
registeredPublications.incrementAndGet()
if (logger.isTraceEnabled) {
registeredPublicationsTrace.add(publication.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
return publication
}
fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription {
suspend fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription = stateMutex.withLock {
val uri = subscriptionUri.build()
// reasons we cannot add a pub/sub to aeron
@ -386,17 +385,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
throw ex
}
pubSubs.incrementAndGet()
// logger.trace { "Creating subscription ${subscription.registrationId()} :: $type sub URI: $uri,stream-id=$streamId" }
logger.trace { "Creating subscription [$aeronLogInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}" }
registeredSubscriptions.incrementAndGet()
if (logger.isTraceEnabled) {
registeredSubscriptionsTrace.add(subscription.registrationId())
}
logger.trace { "Aeron Driver [$driverId]: Creating subscription [$aeronLogInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}" }
return subscription
}
/**
* Guarantee that the publication is closed AND the backing file is removed
*/
suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String, logger: KLogger) {
logger.trace { "Closing publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String, logger: KLogger) = stateMutex.withLock {
logger.trace { "Aeron Driver [$driverId]: Closing publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
try {
// This can throw exceptions!
@ -406,6 +408,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
}
ensureLogfileDeleted("publication", getMediaDriverFile(publication), aeronLogInfo, logger)
registeredPublications.decrementAndGet()
if (logger.isTraceEnabled) {
registeredPublicationsTrace.remove(publication.registrationId())
}
}
/**
@ -418,16 +425,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
// This can throw exceptions!
subscription.close()
} catch (e: Exception) {
// wait 3 seconds and try again!
//
// try {
// subscription.close()
// } catch (e: Exception) {
logger.error(e) { "Unable to close [$aeronLogInfo] subscription $subscription" }
// }
logger.error(e) { "Aeron Driver [$driverId]: Unable to close [$aeronLogInfo] subscription $subscription" }
}
ensureLogfileDeleted("subscription", getMediaDriverFile(subscription), aeronLogInfo, logger)
registeredSubscriptions.decrementAndGet()
if (logger.isTraceEnabled) {
registeredSubscriptionsTrace.remove(subscription.registrationId())
}
}
private suspend fun ensureLogfileDeleted(type: String, logFile: File, aeronLogInfo: String, logger: KLogger) {
@ -444,8 +450,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
if (logFile.exists()) {
logger.error("Aeron Driver [$driverId]: [$aeronLogInfo] Unable to delete aeron $type log on close: $logFile")
}
pubSubs.decrementAndGet()
}
/**
@ -492,8 +496,25 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
}
val driverId = config.id
if (pubSubs.value > 0) {
logger.debug { "Aeron driver [$driverId] still has ${pubSubs.value} pub/subs" }
if (registeredPublications.value > 0) {
if (logger.isTraceEnabled) {
val elements = registeredPublicationsTrace.elements()
val joined = elements.joinToString()
logger.debug { "Aeron Driver [$driverId]: has [$joined] publications (${elements.size} total)" }
} else {
logger.debug { "Aeron Driver [$driverId]: has publications (${registeredPublications.value} total)" }
}
return true
}
if (registeredSubscriptions.value > 0) {
if (logger.isTraceEnabled) {
val elements = registeredSubscriptionsTrace.elements()
val joined = elements.joinToString()
logger.debug { "Aeron Driver [$driverId]: has [$joined] subscriptions (${elements.size} total)" }
} else {
logger.debug { "Aeron Driver [$driverId]: has subscriptions (${registeredSubscriptions.value} total)" }
}
return true
}
@ -633,7 +654,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
logger.error { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"}
}
logger.debug { "Closed the media driver [$driverId] at '${driverDirectory}'" }
logger.debug { "Aeron Driver [$driverId]: Closed the media driver at '${driverDirectory}'" }
// reset our contextDefine value, so that this configuration can safely be reused
config.contextDefined = false
closed = true
}
@ -740,4 +765,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
val lingerNs = getLingerNs()
delay(driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds((lingerNs * multiplier.toDouble()).toLong())) )
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is AeronDriverInternal) return false
if (!super.equals(other)) return false
return driverId == other.driverId
}
override fun hashCode(): Int {
return driverId
}
override fun toString(): String {
return "Aeron Driver [${driverId}]"
}
}