From 1765b4e3679dc2d6d9868f59733953d0f9aacb82 Mon Sep 17 00:00:00 2001 From: Robinson Date: Tue, 27 Apr 2021 10:26:37 +0200 Subject: [PATCH] Fixed server-client pair connection address info --- .../network/aeron/IpcMediaDriverConnection.kt | 171 +++++++++ .../network/aeron/MediaDriverConnection.kt | 343 +----------------- .../aeron/UdpMediaDriverClientConnection.kt | 177 +++++++++ .../aeron/UdpMediaDriverPairedConnection.kt | 39 ++ .../aeron/UdpMediaDriverServerConnection.kt | 139 +++++++ src/dorkbox/network/connection/Connection.kt | 25 +- 6 files changed, 546 insertions(+), 348 deletions(-) create mode 100644 src/dorkbox/network/aeron/IpcMediaDriverConnection.kt create mode 100644 src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt create mode 100644 src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt create mode 100644 src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt diff --git a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt new file mode 100644 index 00000000..fc043744 --- /dev/null +++ b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt @@ -0,0 +1,171 @@ +/* + * Copyright 2021 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.aeron + +import dorkbox.network.exceptions.ClientTimedOutException +import io.aeron.Aeron +import io.aeron.ChannelUriStringBuilder +import kotlinx.coroutines.delay +import mu.KLogger + +/** + * For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER + * NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast + */ +internal class IpcMediaDriverConnection(streamId: Int, + val streamIdSubscription: Int, + sessionId: Int, + ) : + MediaDriverConnection(0, 0, streamId, sessionId, 1_000, true) { + + var success: Boolean = false + + private fun uri(): ChannelUriStringBuilder { + val builder = ChannelUriStringBuilder().media("ipc") + if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + builder.sessionId(sessionId) + } + + return builder + } + + /** + * Set up the subscription + publication channels to the server + * + * @throws ClientTimedOutException if we cannot connect to the server in the designated time + */ + override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + // Create a publication at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + + + if (logger.isTraceEnabled) { + logger.trace("IPC client pub URI: ${publicationUri.build()}") + logger.trace("IPC server sub URI: ${subscriptionUri.build()}") + } + + // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe + // publication of any state to other threads and not be long running or re-entrant with the client. + + // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. + val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) + + var success = false + + // this will wait for the server to acknowledge the connection (all via aeron) + var startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { + if (subscription.isConnected && subscription.imageCount() > 0) { + success = true + break + } + + delay(timeMillis = 100L) + } + + if (!success) { + subscription.close() + throw ClientTimedOutException("Creating subscription connection to aeron") + } + + + success = false + + // this will wait for the server to acknowledge the connection (all via aeron) + startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { + if (publication.isConnected) { + success = true + break + } + + delay(timeMillis = 100L) + } + + if (!success) { + subscription.close() + publication.close() + throw ClientTimedOutException("Creating publication connection to aeron") + } + + this.success = true + this.publication = publication + this.subscription = subscription + } + + /** + * Setup the subscription + publication channels on the server. + * + * serverAddress is ignored for IPC + */ + override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + + + if (logger.isTraceEnabled) { + logger.trace("IPC server pub URI: ${publicationUri.build()}") + logger.trace("IPC server sub URI: ${subscriptionUri.build()}") + } + + // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe + // publication of any state to other threads and not be long running or re-entrant with the client. + + // on close, the publication CAN linger (in case a client goes away, and then comes back) + // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) + + // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. + publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) + } + + override fun clientInfo() : String { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + "[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]" + } else { + "Connecting handshake to IPC [$streamIdSubscription|$streamId]" + } + } + + override fun serverInfo() : String { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + "[$sessionId] IPC listening on [$streamIdSubscription|$streamId] " + } else { + "Listening handshake on IPC [$streamIdSubscription|$streamId]" + } + } + + override fun close() { + if (success) { + subscription.close() + publication.close() + } + } + + override fun toString(): String { + return "[$streamIdSubscription|$streamId] [$sessionId]" + } +} diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index dd851a17..c301715a 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -17,22 +17,14 @@ package dorkbox.network.aeron -import dorkbox.netUtil.IP -import dorkbox.netUtil.IPv4 -import dorkbox.netUtil.IPv6 import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.Aeron -import io.aeron.ChannelUriStringBuilder import io.aeron.Publication import io.aeron.Subscription import kotlinx.coroutines.delay import mu.KLogger -import java.net.Inet4Address -import java.net.InetAddress -import java.util.concurrent.TimeUnit -abstract class MediaDriverConnection(val address: InetAddress?, - val publicationPort: Int, val subscriptionPort: Int, +abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionPort: Int, val streamId: Int, val sessionId: Int, val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable { @@ -81,336 +73,3 @@ abstract class MediaDriverConnection(val address: InetAddress?, abstract fun clientInfo() : String abstract fun serverInfo() : String } - -/** - * For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER. - * A connection timeout of 0, means to wait forever - */ -class UdpMediaDriverConnection(address: InetAddress, - publicationPort: Int, - subscriptionPort: Int, - streamId: Int, - sessionId: Int, - connectionTimeoutMS: Long = 0, - isReliable: Boolean = true) : - MediaDriverConnection(address, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { - - var success: Boolean = false - - val addressString: String by lazy { - if (address is Inet4Address) { - address.hostAddress - } else { - // IPv6 requires the address to be bracketed by [...] - val host = address.hostAddress - if (host[0] == '[') { - host - } else { - "[${address.hostAddress}]" - } - } - } - - private fun uri(): ChannelUriStringBuilder { - val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") - if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - builder.sessionId(sessionId) - } - - return builder - } - - @Suppress("DuplicatedCode") - override suspend fun buildClient(aeron: Aeron, logger: KLogger) { - // Create a publication at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - .endpoint("$addressString:$publicationPort") - - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - .controlEndpoint("$addressString:$subscriptionPort") - .controlMode("dynamic") - - - - if (logger.isTraceEnabled) { - if (address is Inet4Address) { - logger.trace("IPV4 client pub URI: ${publicationUri.build()}") - logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}") - } else { - logger.trace("IPV6 client pub URI: ${publicationUri.build()}") - logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}") - } - } - - // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe - // publication of any state to other threads and not be long running or re-entrant with the client. - // on close, the publication CAN linger (in case a client goes away, and then comes back) - // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) - - var success = false - - // this will wait for the server to acknowledge the connection (all via aeron) - val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS) - var startTime = System.nanoTime() - while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { - if (subscription.isConnected) { - success = true - break - } - - delay(timeMillis = 100L) - } - - if (!success) { - subscription.close() - throw ClientTimedOutException("Cannot create subscription!") - } - - - success = false - - // this will wait for the server to acknowledge the connection (all via aeron) - startTime = System.nanoTime() - while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { - if (publication.isConnected) { - success = true - break - } - - delay(timeMillis = 100L) - } - - if (!success) { - subscription.close() - publication.close() - throw ClientTimedOutException("Creating publication connection to aeron") - } - - this.success = true - - this.publication = publication - this.subscription = subscription - } - - override suspend fun buildServer(aeron: Aeron, logger: KLogger) { - // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - .controlEndpoint("$addressString:$publicationPort") - .controlMode("dynamic") - - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - .endpoint("$addressString:$subscriptionPort") - - - - if (logger.isTraceEnabled) { - if (address is Inet4Address) { - logger.trace("IPV4 server pub URI: ${publicationUri.build()}") - logger.trace("IPV4 server sub URI: ${subscriptionUri.build()}") - } else { - logger.trace("IPV6 server pub URI: ${publicationUri.build()}") - logger.trace("IPV6 server sub URI: ${subscriptionUri.build()}") - } - } - - // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe - // publication of any state to other threads and not be long running or re-entrant with the client. - // on close, the publication CAN linger (in case a client goes away, and then comes back) - // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - - // If we start/stop too quickly, we might have the address already in use! Retry a few times. - publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) - } - - override fun clientInfo(): String { - address!! - - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" - } else { - "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" - } - } - - override fun serverInfo(): String { - val address = if (address == IPv4.WILDCARD || address == IPv6.WILDCARD) { - if (address == IPv4.WILDCARD) { - address.hostAddress - } else { - IPv4.WILDCARD.hostAddress + "/" + address.hostAddress - } - } else { - IP.toString(address!!) - } - - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - "Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" - } else { - "Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" - } - } - - override fun close() { - if (success) { - subscription.close() - publication.close() - } - } - - override fun toString(): String { - return "$addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" - } -} - -/** - * For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER - * NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast - */ -class IpcMediaDriverConnection(streamId: Int, - val streamIdSubscription: Int, - sessionId: Int, - ) : - MediaDriverConnection(null, 0, 0, streamId, sessionId, 1_000, true) { - - var success: Boolean = false - - private fun uri(): ChannelUriStringBuilder { - val builder = ChannelUriStringBuilder().media("ipc") - if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - builder.sessionId(sessionId) - } - - return builder - } - - /** - * Set up the subscription + publication channels to the server - * - * @throws ClientTimedOutException if we cannot connect to the server in the designated time - */ - override suspend fun buildClient(aeron: Aeron, logger: KLogger) { - // Create a publication at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - - - if (logger.isTraceEnabled) { - logger.trace("IPC client pub URI: ${publicationUri.build()}") - logger.trace("IPC server sub URI: ${subscriptionUri.build()}") - } - - // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe - // publication of any state to other threads and not be long running or re-entrant with the client. - - // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) - - var success = false - - // this will wait for the server to acknowledge the connection (all via aeron) - var startTime = System.currentTimeMillis() - while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { - if (subscription.isConnected && subscription.imageCount() > 0) { - success = true - break - } - - delay(timeMillis = 100L) - } - - if (!success) { - subscription.close() - throw ClientTimedOutException("Creating subscription connection to aeron") - } - - - success = false - - // this will wait for the server to acknowledge the connection (all via aeron) - startTime = System.currentTimeMillis() - while (System.currentTimeMillis() - startTime < connectionTimeoutMS) { - if (publication.isConnected) { - success = true - break - } - - delay(timeMillis = 100L) - } - - if (!success) { - subscription.close() - publication.close() - throw ClientTimedOutException("Creating publication connection to aeron") - } - - this.success = true - this.publication = publication - this.subscription = subscription - } - - /** - * Setup the subscription + publication channels on the server - */ - override suspend fun buildServer(aeron: Aeron, logger: KLogger) { - // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - - - if (logger.isTraceEnabled) { - logger.trace("IPC server pub URI: ${publicationUri.build()}") - logger.trace("IPC server sub URI: ${subscriptionUri.build()}") - } - - // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe - // publication of any state to other threads and not be long running or re-entrant with the client. - - // on close, the publication CAN linger (in case a client goes away, and then comes back) - // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - - // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) - } - - override fun clientInfo() : String { - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - "[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]" - } else { - "Connecting handshake to IPC [$streamIdSubscription|$streamId]" - } - } - - override fun serverInfo() : String { - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { - "[$sessionId] IPC listening on [$streamIdSubscription|$streamId] " - } else { - "Listening handshake on IPC [$streamIdSubscription|$streamId]" - } - } - - override fun close() { - if (success) { - subscription.close() - publication.close() - } - } - - override fun toString(): String { - return "[$streamIdSubscription|$streamId] [$sessionId]" - } -} diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt new file mode 100644 index 00000000..8951f73e --- /dev/null +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -0,0 +1,177 @@ +/* + * Copyright 2021 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.aeron + +import dorkbox.netUtil.IP +import dorkbox.network.exceptions.ClientException +import dorkbox.network.exceptions.ClientTimedOutException +import io.aeron.Aeron +import io.aeron.ChannelUriStringBuilder +import kotlinx.coroutines.delay +import mu.KLogger +import java.net.Inet4Address +import java.net.InetAddress +import java.util.concurrent.TimeUnit + +/** + * For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER. + * A connection timeout of 0, means to wait forever + */ +internal class UdpMediaDriverClientConnection(val address: InetAddress, + publicationPort: Int, + subscriptionPort: Int, + streamId: Int, + sessionId: Int, + connectionTimeoutMS: Long = 0, + isReliable: Boolean = true) : + MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + + var success: Boolean = false + + private fun aeronConnectionString(ipAddress: InetAddress): String { + return if (ipAddress is Inet4Address) { + ipAddress.hostAddress + } else { + // IPv6 requires the address to be bracketed by [...] + val host = ipAddress.hostAddress + if (host[0] == '[') { + host + } else { + "[${ipAddress.hostAddress}]" + } + } + } + + val addressString: String by lazy { + IP.toString(address) + } + + private fun uri(): ChannelUriStringBuilder { + val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") + if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + builder.sessionId(sessionId) + } + + return builder + } + + @Suppress("DuplicatedCode") + override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + val aeronAddressString = aeronConnectionString(address) + + // Create a publication at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + .endpoint("$aeronAddressString:$publicationPort") + + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + .controlEndpoint("$aeronAddressString:$subscriptionPort") + .controlMode("dynamic") + + + + if (logger.isTraceEnabled) { + if (address is Inet4Address) { + logger.trace("IPV4 client pub URI: ${publicationUri.build()}") + logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}") + } else { + logger.trace("IPV6 client pub URI: ${publicationUri.build()}") + logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}") + } + } + + // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe + // publication of any state to other threads and not be long running or re-entrant with the client. + // on close, the publication CAN linger (in case a client goes away, and then comes back) + // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) + val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) + + var success = false + + // this will wait for the server to acknowledge the connection (all via aeron) + val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS) + var startTime = System.nanoTime() + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { + if (subscription.isConnected) { + success = true + break + } + + delay(timeMillis = 100L) + } + + if (!success) { + subscription.close() + throw ClientTimedOutException("Cannot create subscription!") + } + + + success = false + + // this will wait for the server to acknowledge the connection (all via aeron) + startTime = System.nanoTime() + while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) { + if (publication.isConnected) { + success = true + break + } + + delay(timeMillis = 100L) + } + + if (!success) { + subscription.close() + publication.close() + throw ClientTimedOutException("Creating publication connection to aeron") + } + + this.success = true + + this.publication = publication + this.subscription = subscription + } + + override fun clientInfo(): String { + address + + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + } else { + "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" + } + } + + override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + throw ClientException("Server info not implemented in Client MDC") + } + override fun serverInfo(): String { + throw ClientException("Server info not implemented in Client MDC") + } + + override fun close() { + if (success) { + subscription.close() + publication.close() + } + } + + override fun toString(): String { + return "$addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + } +} diff --git a/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt new file mode 100644 index 00000000..f2317487 --- /dev/null +++ b/src/dorkbox/network/aeron/UdpMediaDriverPairedConnection.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2021 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.aeron + +import java.net.InetAddress + +/** + * This represents the connection PAIR between a server<->client + * A connection timeout of 0, means to wait forever + */ +internal class UdpMediaDriverPairedConnection(listenAddress: InetAddress, + val remoteAddress: InetAddress, + val remoteAddressString: String, + publicationPort: Int, + subscriptionPort: Int, + streamId: Int, + sessionId: Int, + connectionTimeoutMS: Long = 0, + isReliable: Boolean = true) : + UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + + override fun toString(): String { + return "$remoteAddressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + } +} diff --git a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt new file mode 100644 index 00000000..e59e27fb --- /dev/null +++ b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt @@ -0,0 +1,139 @@ +/* + * Copyright 2021 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.aeron + +import dorkbox.netUtil.IP +import dorkbox.netUtil.IPv4 +import dorkbox.netUtil.IPv6 +import dorkbox.network.exceptions.ServerException +import io.aeron.Aeron +import io.aeron.ChannelUriStringBuilder +import mu.KLogger +import java.net.Inet4Address +import java.net.InetAddress + +/** + * For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER. + * A connection timeout of 0, means to wait forever + */ +internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddress, + publicationPort: Int, + subscriptionPort: Int, + streamId: Int, + sessionId: Int, + connectionTimeoutMS: Long = 0, + isReliable: Boolean = true) : + MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) { + + var success: Boolean = false + + private fun aeronConnectionString(ipAddress: InetAddress): String { + return if (ipAddress is Inet4Address) { + ipAddress.hostAddress + } else { + // IPv6 requires the address to be bracketed by [...] + val host = ipAddress.hostAddress + if (host[0] == '[') { + host + } else { + "[${ipAddress.hostAddress}]" + } + } + } + + private fun uri(): ChannelUriStringBuilder { + val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") + if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + builder.sessionId(sessionId) + } + + return builder + } + + @Suppress("DuplicatedCode") + override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + throw ServerException("Client info not implemented in Server MDC") + } + + override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + val connectionString = aeronConnectionString(listenAddress) + + // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + .controlEndpoint("$connectionString:$publicationPort") + .controlMode("dynamic") + + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + .endpoint("$connectionString:$subscriptionPort") + + + + if (logger.isTraceEnabled) { + if (listenAddress is Inet4Address) { + logger.trace("IPV4 server pub URI: ${publicationUri.build()}") + logger.trace("IPV4 server sub URI: ${subscriptionUri.build()}") + } else { + logger.trace("IPV6 server pub URI: ${publicationUri.build()}") + logger.trace("IPV6 server sub URI: ${subscriptionUri.build()}") + } + } + + // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe + // publication of any state to other threads and not be long running or re-entrant with the client. + // on close, the publication CAN linger (in case a client goes away, and then comes back) + // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) + + // If we start/stop too quickly, we might have the address already in use! Retry a few times. + publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) + subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) + } + + override fun clientInfo(): String { + throw ServerException("Client info not implemented in Server MDC") + } + + override fun serverInfo(): String { + val address = if (listenAddress == IPv4.WILDCARD || listenAddress == IPv6.WILDCARD) { + if (listenAddress == IPv4.WILDCARD) { + listenAddress.hostAddress + } else { + IPv4.WILDCARD.hostAddress + "/" + listenAddress.hostAddress + } + } else { + IP.toString(listenAddress) + } + + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + "Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + } else { + "Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" + } + } + + override fun close() { + if (success) { + subscription.close() + publication.close() + } + } + + override fun toString(): String { + return "$IP.toString(listenAddress) [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" + } +} diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 7480a02a..fbe076b4 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -16,7 +16,9 @@ package dorkbox.network.connection import dorkbox.network.aeron.IpcMediaDriverConnection -import dorkbox.network.aeron.UdpMediaDriverConnection +import dorkbox.network.aeron.UdpMediaDriverClientConnection +import dorkbox.network.aeron.UdpMediaDriverPairedConnection +import dorkbox.network.aeron.UdpMediaDriverServerConnection import dorkbox.network.handshake.ConnectionCounts import dorkbox.network.handshake.RandomIdAllocator import dorkbox.network.ping.Ping @@ -83,7 +85,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { /** * @return true if this connection is a network connection */ - val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverConnection + val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverServerConnection /** * the endpoint associated with this connection @@ -144,24 +146,35 @@ open class Connection(connectionParameters: ConnectionParams<*>) { subscription = mediaDriverConnection.subscription publication = mediaDriverConnection.publication - remoteAddress = mediaDriverConnection.address // this can be the IP address or "ipc" word id = mediaDriverConnection.sessionId // NOTE: this is UNIQUE per server! if (mediaDriverConnection is IpcMediaDriverConnection) { streamId = 0 // this is because with IPC, we have stream sub/pub (which are replaced as port sub/pub) subscriptionPort = mediaDriverConnection.streamIdSubscription publicationPort = mediaDriverConnection.streamId + + remoteAddress = null remoteAddressString = "ipc" toString0 = "[$id] IPC [$subscriptionPort|$publicationPort]" } else { - mediaDriverConnection as UdpMediaDriverConnection - streamId = mediaDriverConnection.streamId // NOTE: this is UNIQUE per server! subscriptionPort = mediaDriverConnection.subscriptionPort publicationPort = mediaDriverConnection.publicationPort - remoteAddressString = mediaDriverConnection.addressString + when (mediaDriverConnection) { + is UdpMediaDriverClientConnection -> { + remoteAddress = mediaDriverConnection.address + remoteAddressString = mediaDriverConnection.addressString + } + is UdpMediaDriverPairedConnection -> { + remoteAddress = mediaDriverConnection.remoteAddress + remoteAddressString = mediaDriverConnection.remoteAddressString + } + else -> { + throw Exception("Invalid media driver connection type! : ${mediaDriverConnection::class.qualifiedName}") + } + } toString0 = "[$id] $remoteAddressString [$publicationPort|$subscriptionPort]" }