From e38ff52c083dc8ec4467362e62b21b97acaad106 Mon Sep 17 00:00:00 2001 From: Robinson Date: Sun, 26 Feb 2023 18:18:19 +0100 Subject: [PATCH] Updated pub/sub API --- src/dorkbox/network/aeron/AeronDriver.kt | 12 ++++++------ .../network/aeron/mediaDriver/ClientIpcDriver.kt | 4 ++-- .../network/aeron/mediaDriver/ClientUdpDriver.kt | 4 ++-- .../network/aeron/mediaDriver/ServerIpcDriver.kt | 2 +- .../aeron/mediaDriver/ServerIpcPairedDriver.kt | 4 ++-- .../network/aeron/mediaDriver/ServerUdpDriver.kt | 2 +- .../aeron/mediaDriver/ServerUdpPairedDriver.kt | 4 ++-- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index d96edae8..19f0e197 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -386,9 +386,9 @@ class AeronDriver( return true } - fun addPublication(publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { + fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { val uri = publicationUri.build() - logger.trace { "${this.type.simpleName} $type pub URI: $uri,stream-id=$streamId" } + logger.trace { "$type pub URI: $uri,stream-id=$streamId" } // reasons we cannot add a pub/sub to aeron // 1) the driver was closed @@ -432,9 +432,9 @@ class AeronDriver( /** * This is not a thread-safe publication! */ - fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { + fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { val uri = publicationUri.build() - logger.trace { "${this.type.simpleName} $type e-pub URI: $uri,stream-id=$streamId" } + logger.trace { "$type e-pub URI: $uri,stream-id=$streamId" } // reasons we cannot add a pub/sub to aeron // 1) the driver was closed @@ -476,9 +476,9 @@ class AeronDriver( return publication } - fun addSubscription(subscriptionUri: ChannelUriStringBuilder, type: String, streamId: Int): Subscription { + fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, type: String, streamId: Int): Subscription { val uri = subscriptionUri.build() - logger.trace { "${this.type.simpleName} $type sub URI: $uri,stream-id=$streamId" } + logger.trace { "$type sub URI: $uri,stream-id=$streamId" } // reasons we cannot add a pub/sub to aeron // 1) the driver was closed diff --git a/src/dorkbox/network/aeron/mediaDriver/ClientIpcDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ClientIpcDriver.kt index e2d2f55f..657fa409 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ClientIpcDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ClientIpcDriver.kt @@ -68,7 +68,7 @@ internal open class ClientIpcDriver(streamId: Int, // ESPECIALLY if it is with the same streamID // this check is in the "reconnect" logic - val publication = aeronDriver.addExclusivePublication(publicationUri, "IPC", streamId) + val publication = aeronDriver.addExclusivePublication(logger, publicationUri, "IPC", streamId) // always include the linger timeout, so we don't accidentally kill ourself by taking too long val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs() @@ -95,7 +95,7 @@ internal open class ClientIpcDriver(streamId: Int, // Create a subscription at the given address and port, using the given stream ID. val subscriptionUri = uri("ipc", sessionId) - val subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) + val subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId) this.info = if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { "[$sessionId] IPC connection established to [$streamId|$subscriptionPort]" diff --git a/src/dorkbox/network/aeron/mediaDriver/ClientUdpDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ClientUdpDriver.kt index 0b216558..c802dd3d 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ClientUdpDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ClientUdpDriver.kt @@ -78,7 +78,7 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri // For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions. // ESPECIALLY if it is with the same streamID. This was noticed as a problem with IPC - val publication = aeronDriver.addExclusivePublication(publicationUri, type, streamId) + val publication = aeronDriver.addExclusivePublication(logger, publicationUri, type, streamId) // this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces. @@ -100,7 +100,7 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri .controlEndpoint(isIpv4, addressString, port+1) .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) - val subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) + val subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId) // always include the linger timeout, so we don't accidentally kill ourselves by taking too long diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerIpcDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerIpcDriver.kt index 84374d41..33daf39e 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerIpcDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerIpcDriver.kt @@ -49,6 +49,6 @@ internal open class ServerIpcDriver(streamId: Int, success = true - subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) + subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId) } } diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerIpcPairedDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerIpcPairedDriver.kt index d938388e..43352155 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerIpcPairedDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerIpcPairedDriver.kt @@ -55,7 +55,7 @@ internal open class ServerIpcPairedDriver(streamId: Int, } this.success = true - this.subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) - this.publication = aeronDriver.addExclusivePublication(publicationUri, "IPC", streamId) + this.subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId) + this.publication = aeronDriver.addExclusivePublication(logger, publicationUri, "IPC", streamId) } } diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerUdpDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerUdpDriver.kt index d97794ca..c49ec766 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerUdpDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerUdpDriver.kt @@ -74,6 +74,6 @@ internal open class ServerUdpDriver(val listenAddress: InetAddress, } this.success = true - this.subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) + this.subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId) } } diff --git a/src/dorkbox/network/aeron/mediaDriver/ServerUdpPairedDriver.kt b/src/dorkbox/network/aeron/mediaDriver/ServerUdpPairedDriver.kt index 6fb0d925..c1bf4d13 100644 --- a/src/dorkbox/network/aeron/mediaDriver/ServerUdpPairedDriver.kt +++ b/src/dorkbox/network/aeron/mediaDriver/ServerUdpPairedDriver.kt @@ -71,7 +71,7 @@ internal class ServerUdpPairedDriver( .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) - val publication = aeronDriver.addExclusivePublication(publicationUri, type, streamId) + val publication = aeronDriver.addExclusivePublication(logger, publicationUri, type, streamId) // if we are IPv6 WILDCARD -- then our subscription must ALSO be IPv6, even if our connection is via IPv4 var subShouldBeIpv4 = isRemoteIpv4 @@ -103,7 +103,7 @@ internal class ServerUdpPairedDriver( .endpoint(subShouldBeIpv4, properSubAddress, port) - val subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) + val subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId) val remoteAddressString = if (isRemoteIpv4) { IPv4.toString(remoteAddress as Inet4Address)