Updated pub/sub API

This commit is contained in:
Robinson 2023-02-26 18:18:19 +01:00
parent 912a2f4c2a
commit e38ff52c08
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
7 changed files with 16 additions and 16 deletions

View File

@ -386,9 +386,9 @@ class AeronDriver(
return true return true
} }
fun addPublication(publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication {
val uri = publicationUri.build() val uri = publicationUri.build()
logger.trace { "${this.type.simpleName} $type pub URI: $uri,stream-id=$streamId" } logger.trace { "$type pub URI: $uri,stream-id=$streamId" }
// reasons we cannot add a pub/sub to aeron // reasons we cannot add a pub/sub to aeron
// 1) the driver was closed // 1) the driver was closed
@ -432,9 +432,9 @@ class AeronDriver(
/** /**
* This is not a thread-safe publication! * This is not a thread-safe publication!
*/ */
fun addExclusivePublication(publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication { fun addExclusivePublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, type: String, streamId: Int): Publication {
val uri = publicationUri.build() val uri = publicationUri.build()
logger.trace { "${this.type.simpleName} $type e-pub URI: $uri,stream-id=$streamId" } logger.trace { "$type e-pub URI: $uri,stream-id=$streamId" }
// reasons we cannot add a pub/sub to aeron // reasons we cannot add a pub/sub to aeron
// 1) the driver was closed // 1) the driver was closed
@ -476,9 +476,9 @@ class AeronDriver(
return publication return publication
} }
fun addSubscription(subscriptionUri: ChannelUriStringBuilder, type: String, streamId: Int): Subscription { fun addSubscription(logger: KLogger, subscriptionUri: ChannelUriStringBuilder, type: String, streamId: Int): Subscription {
val uri = subscriptionUri.build() val uri = subscriptionUri.build()
logger.trace { "${this.type.simpleName} $type sub URI: $uri,stream-id=$streamId" } logger.trace { "$type sub URI: $uri,stream-id=$streamId" }
// reasons we cannot add a pub/sub to aeron // reasons we cannot add a pub/sub to aeron
// 1) the driver was closed // 1) the driver was closed

View File

@ -68,7 +68,7 @@ internal open class ClientIpcDriver(streamId: Int,
// ESPECIALLY if it is with the same streamID // ESPECIALLY if it is with the same streamID
// this check is in the "reconnect" logic // this check is in the "reconnect" logic
val publication = aeronDriver.addExclusivePublication(publicationUri, "IPC", streamId) val publication = aeronDriver.addExclusivePublication(logger, publicationUri, "IPC", streamId)
// always include the linger timeout, so we don't accidentally kill ourself by taking too long // always include the linger timeout, so we don't accidentally kill ourself by taking too long
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs() val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs()
@ -95,7 +95,7 @@ internal open class ClientIpcDriver(streamId: Int,
// Create a subscription at the given address and port, using the given stream ID. // Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uri("ipc", sessionId) val subscriptionUri = uri("ipc", sessionId)
val subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) val subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId)
this.info = if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { this.info = if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC connection established to [$streamId|$subscriptionPort]" "[$sessionId] IPC connection established to [$streamId|$subscriptionPort]"

View File

@ -78,7 +78,7 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri
// For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions. // For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions.
// ESPECIALLY if it is with the same streamID. This was noticed as a problem with IPC // ESPECIALLY if it is with the same streamID. This was noticed as a problem with IPC
val publication = aeronDriver.addExclusivePublication(publicationUri, type, streamId) val publication = aeronDriver.addExclusivePublication(logger, publicationUri, type, streamId)
// this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces. // this will cause us to listen on the interface that connects with the remote address, instead of ALL interfaces.
@ -100,7 +100,7 @@ internal class ClientUdpDriver(val address: InetAddress, val addressString: Stri
.controlEndpoint(isIpv4, addressString, port+1) .controlEndpoint(isIpv4, addressString, port+1)
.controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
val subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) val subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId)
// always include the linger timeout, so we don't accidentally kill ourselves by taking too long // always include the linger timeout, so we don't accidentally kill ourselves by taking too long

View File

@ -49,6 +49,6 @@ internal open class ServerIpcDriver(streamId: Int,
success = true success = true
subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId)
} }
} }

View File

@ -55,7 +55,7 @@ internal open class ServerIpcPairedDriver(streamId: Int,
} }
this.success = true this.success = true
this.subscription = aeronDriver.addSubscription(subscriptionUri, "IPC", streamId) this.subscription = aeronDriver.addSubscription(logger, subscriptionUri, "IPC", streamId)
this.publication = aeronDriver.addExclusivePublication(publicationUri, "IPC", streamId) this.publication = aeronDriver.addExclusivePublication(logger, publicationUri, "IPC", streamId)
} }
} }

View File

@ -74,6 +74,6 @@ internal open class ServerUdpDriver(val listenAddress: InetAddress,
} }
this.success = true this.success = true
this.subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) this.subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId)
} }
} }

View File

@ -71,7 +71,7 @@ internal class ServerUdpPairedDriver(
.controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
val publication = aeronDriver.addExclusivePublication(publicationUri, type, streamId) val publication = aeronDriver.addExclusivePublication(logger, publicationUri, type, streamId)
// if we are IPv6 WILDCARD -- then our subscription must ALSO be IPv6, even if our connection is via IPv4 // if we are IPv6 WILDCARD -- then our subscription must ALSO be IPv6, even if our connection is via IPv4
var subShouldBeIpv4 = isRemoteIpv4 var subShouldBeIpv4 = isRemoteIpv4
@ -103,7 +103,7 @@ internal class ServerUdpPairedDriver(
.endpoint(subShouldBeIpv4, properSubAddress, port) .endpoint(subShouldBeIpv4, properSubAddress, port)
val subscription = aeronDriver.addSubscription(subscriptionUri, type, streamId) val subscription = aeronDriver.addSubscription(logger, subscriptionUri, type, streamId)
val remoteAddressString = if (isRemoteIpv4) { val remoteAddressString = if (isRemoteIpv4) {
IPv4.toString(remoteAddress as Inet4Address) IPv4.toString(remoteAddress as Inet4Address)