diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index 248be49e..a63957eb 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -319,50 +319,38 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger } - fun addPublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = - internal.addPublication(logger, publicationUri, aeronLogInfo, streamId) + suspend fun addPublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication { + return internal.addPublication(logger, publicationUri, aeronLogInfo, streamId) + } /** * This is not a thread-safe publication! */ - fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = - internal.addExclusivePublication(logger, publicationUri, aeronLogInfo, streamId) + suspend fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication { + return internal.addExclusivePublication(logger, publicationUri, aeronLogInfo, streamId) + } - fun addSubscription(subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription = - internal.addSubscription(logger, subscriptionUri, aeronLogInfo, streamId) + suspend fun addSubscription(subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription { + return internal.addSubscription(logger, subscriptionUri, aeronLogInfo, streamId) + } /** * Guarantee that the publication is closed AND the backing file is removed * * This can throw exceptions! */ - suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String) = + suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String) { internal.closeAndDeletePublication(publication, aeronLogInfo, logger) + } /** * Guarantee that the publication is closed AND the backing file is removed * * This can throw exceptions! */ - internal suspend fun closeAndDeletePublication(connection: ClientHandshakeDriver) = - internal.closeAndDeletePublication(connection.publication, connection.logInfo, logger) - - - /** - * Guarantee that the publication is closed AND the backing file is removed - * - * This can throw exceptions! - */ - internal suspend fun closeAndDeleteSubscription(connection: ClientHandshakeDriver) = - internal.closeAndDeleteSubscription(connection.subscription, connection.logInfo, logger) - - /** - * Guarantee that the publication is closed AND the backing file is removed - * - * This can throw exceptions! - */ - suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String) = + suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String) { internal.closeAndDeleteSubscription(subscription, aeronLogInfo, logger) + } /** diff --git a/src/dorkbox/network/aeron/AeronDriverInternal.kt b/src/dorkbox/network/aeron/AeronDriverInternal.kt index f8bb667c..957de0c7 100644 --- a/src/dorkbox/network/aeron/AeronDriverInternal.kt +++ b/src/dorkbox/network/aeron/AeronDriverInternal.kt @@ -16,12 +16,12 @@ package dorkbox.network.aeron +import dorkbox.collections.LockFreeHashSet import dorkbox.network.Configuration import dorkbox.network.connection.EndPoint import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace -import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.exceptions.AeronDriverException import dorkbox.network.exceptions.ClientRetryException import io.aeron.Aeron @@ -38,7 +38,7 @@ import mu.KotlinLogging import org.agrona.DirectBuffer import org.agrona.concurrent.BackoffIdleStrategy import java.io.File -import java.util.concurrent.TimeUnit +import java.util.concurrent.* internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: Configuration.MediaDriverConfig) { companion object { @@ -102,7 +102,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C private val context: AeronContext private val aeronErrorHandler: (Throwable) -> Unit - private val pubSubs = atomic(0) + private val registeredPublications = atomic(0) + private val registeredSubscriptions = atomic(0) + + private val registeredPublicationsTrace: LockFreeHashSet = LockFreeHashSet() + private val registeredSubscriptionsTrace: LockFreeHashSet = LockFreeHashSet() private val stateMutex = Mutex() @@ -240,7 +244,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } - fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication { + suspend fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = stateMutex.withLock { val uri = publicationUri.build() // reasons we cannot add a pub/sub to aeron @@ -279,16 +283,19 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C throw ex } - pubSubs.incrementAndGet() -// logger.trace { "Creating publication ${publication.registrationId()}" } - logger.trace { "Creating publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } + registeredPublications.incrementAndGet() + if (logger.isTraceEnabled) { + registeredPublicationsTrace.add(publication.registrationId()) + } + + logger.trace { "Aeron Driver [$driverId]: Creating publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } return publication } /** * This is not a thread-safe publication! */ - fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication { + suspend fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication = stateMutex.withLock { val uri = publicationUri.build() // reasons we cannot add a pub/sub to aeron @@ -314,21 +321,10 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C aeron1.addExclusivePublication(uri, streamId) } catch (e: Exception) { // this happens if the aeron media driver cannot actually establish connection... OR IF IT IS TOO FAST BETWEEN ADD AND REMOVE FOR THE SAME SESSION/STREAM ID! - Thread.sleep(10000) - try { - println() - println("HAD TO WAIT FOR PUBLICATION!") - println() - println() - - aeron1.addExclusivePublication(uri, streamId) - } catch (e: Exception) { - e.cleanStackTraceInternal() - e.cause?.cleanStackTraceInternal() - val ex = ClientRetryException("Error adding a publication", e) - ex.cleanAllStackTrace() - throw ex - } + e.cleanAllStackTrace() + val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication", e) + ex.cleanAllStackTrace() + throw ex } if (publication == null) { @@ -338,13 +334,16 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C throw ex } - pubSubs.incrementAndGet() -// logger.trace { "Creating ex-publication ${publication.registrationId()} :: $type e-pub URI: $uri,stream-id=$streamId" } - logger.trace { "Creating ex-publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } + registeredPublications.incrementAndGet() + if (logger.isTraceEnabled) { + registeredPublicationsTrace.add(publication.registrationId()) + } + + logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $aeronLogInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } return publication } - fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription { + suspend fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Subscription = stateMutex.withLock { val uri = subscriptionUri.build() // reasons we cannot add a pub/sub to aeron @@ -386,17 +385,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C throw ex } - pubSubs.incrementAndGet() -// logger.trace { "Creating subscription ${subscription.registrationId()} :: $type sub URI: $uri,stream-id=$streamId" } - logger.trace { "Creating subscription [$aeronLogInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}" } + registeredSubscriptions.incrementAndGet() + if (logger.isTraceEnabled) { + registeredSubscriptionsTrace.add(subscription.registrationId()) + } + + logger.trace { "Aeron Driver [$driverId]: Creating subscription [$aeronLogInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}" } return subscription } /** * Guarantee that the publication is closed AND the backing file is removed */ - suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String, logger: KLogger) { - logger.trace { "Closing publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } + suspend fun closeAndDeletePublication(publication: Publication, aeronLogInfo: String, logger: KLogger) = stateMutex.withLock { + logger.trace { "Aeron Driver [$driverId]: Closing publication [$aeronLogInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" } try { // This can throw exceptions! @@ -406,6 +408,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } ensureLogfileDeleted("publication", getMediaDriverFile(publication), aeronLogInfo, logger) + + registeredPublications.decrementAndGet() + if (logger.isTraceEnabled) { + registeredPublicationsTrace.remove(publication.registrationId()) + } } /** @@ -418,16 +425,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C // This can throw exceptions! subscription.close() } catch (e: Exception) { - // wait 3 seconds and try again! -// -// try { -// subscription.close() -// } catch (e: Exception) { - logger.error(e) { "Unable to close [$aeronLogInfo] subscription $subscription" } -// } + logger.error(e) { "Aeron Driver [$driverId]: Unable to close [$aeronLogInfo] subscription $subscription" } } ensureLogfileDeleted("subscription", getMediaDriverFile(subscription), aeronLogInfo, logger) + + registeredSubscriptions.decrementAndGet() + if (logger.isTraceEnabled) { + registeredSubscriptionsTrace.remove(subscription.registrationId()) + } } private suspend fun ensureLogfileDeleted(type: String, logFile: File, aeronLogInfo: String, logger: KLogger) { @@ -444,8 +450,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C if (logFile.exists()) { logger.error("Aeron Driver [$driverId]: [$aeronLogInfo] Unable to delete aeron $type log on close: $logFile") } - - pubSubs.decrementAndGet() } /** @@ -492,8 +496,25 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C } val driverId = config.id - if (pubSubs.value > 0) { - logger.debug { "Aeron driver [$driverId] still has ${pubSubs.value} pub/subs" } + if (registeredPublications.value > 0) { + if (logger.isTraceEnabled) { + val elements = registeredPublicationsTrace.elements() + val joined = elements.joinToString() + logger.debug { "Aeron Driver [$driverId]: has [$joined] publications (${elements.size} total)" } + } else { + logger.debug { "Aeron Driver [$driverId]: has publications (${registeredPublications.value} total)" } + } + return true + } + + if (registeredSubscriptions.value > 0) { + if (logger.isTraceEnabled) { + val elements = registeredSubscriptionsTrace.elements() + val joined = elements.joinToString() + logger.debug { "Aeron Driver [$driverId]: has [$joined] subscriptions (${elements.size} total)" } + } else { + logger.debug { "Aeron Driver [$driverId]: has subscriptions (${registeredSubscriptions.value} total)" } + } return true } @@ -633,7 +654,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C logger.error { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"} } - logger.debug { "Closed the media driver [$driverId] at '${driverDirectory}'" } + logger.debug { "Aeron Driver [$driverId]: Closed the media driver at '${driverDirectory}'" } + + // reset our contextDefine value, so that this configuration can safely be reused + config.contextDefined = false + closed = true } @@ -740,4 +765,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C val lingerNs = getLingerNs() delay(driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds((lingerNs * multiplier.toDouble()).toLong())) ) } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is AeronDriverInternal) return false + if (!super.equals(other)) return false + + return driverId == other.driverId + } + + override fun hashCode(): Int { + return driverId + } + + override fun toString(): String { + return "Aeron Driver [${driverId}]" + } }