Fixed server-client pair connection address info

This commit is contained in:
Robinson 2021-04-27 10:26:37 +02:00
parent 7d39f70450
commit 1765b4e367
6 changed files with 546 additions and 348 deletions

View File

@ -0,0 +1,171 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
/**
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast
*/
internal class IpcMediaDriverConnection(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int,
) :
MediaDriverConnection(0, 0, streamId, sessionId, 1_000, true) {
var success: Boolean = false
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().media("ipc")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
return builder
}
/**
* Set up the subscription + publication channels to the server
*
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
if (logger.isTraceEnabled) {
logger.trace("IPC client pub URI: ${publicationUri.build()}")
logger.trace("IPC server sub URI: ${subscriptionUri.build()}")
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
var success = false
// this will wait for the server to acknowledge the connection (all via aeron)
var startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
if (subscription.isConnected && subscription.imageCount() > 0) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
throw ClientTimedOutException("Creating subscription connection to aeron")
}
success = false
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
if (publication.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
publication.close()
throw ClientTimedOutException("Creating publication connection to aeron")
}
this.success = true
this.publication = publication
this.subscription = subscription
}
/**
* Setup the subscription + publication channels on the server.
*
* serverAddress is ignored for IPC
*/
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
if (logger.isTraceEnabled) {
logger.trace("IPC server pub URI: ${publicationUri.build()}")
logger.trace("IPC server sub URI: ${subscriptionUri.build()}")
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
}
override fun clientInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]"
} else {
"Connecting handshake to IPC [$streamIdSubscription|$streamId]"
}
}
override fun serverInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC listening on [$streamIdSubscription|$streamId] "
} else {
"Listening handshake on IPC [$streamIdSubscription|$streamId]"
}
}
override fun close() {
if (success) {
subscription.close()
publication.close()
}
}
override fun toString(): String {
return "[$streamIdSubscription|$streamId] [$sessionId]"
}
}

View File

@ -17,22 +17,14 @@
package dorkbox.network.aeron
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import io.aeron.Publication
import io.aeron.Subscription
import kotlinx.coroutines.delay
import mu.KLogger
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.TimeUnit
abstract class MediaDriverConnection(val address: InetAddress?,
val publicationPort: Int, val subscriptionPort: Int,
abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionPort: Int,
val streamId: Int, val sessionId: Int,
val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable {
@ -81,336 +73,3 @@ abstract class MediaDriverConnection(val address: InetAddress?,
abstract fun clientInfo() : String
abstract fun serverInfo() : String
}
/**
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
* A connection timeout of 0, means to wait forever
*/
class UdpMediaDriverConnection(address: InetAddress,
publicationPort: Int,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
isReliable: Boolean = true) :
MediaDriverConnection(address, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
var success: Boolean = false
val addressString: String by lazy {
if (address is Inet4Address) {
address.hostAddress
} else {
// IPv6 requires the address to be bracketed by [...]
val host = address.hostAddress
if (host[0] == '[') {
host
} else {
"[${address.hostAddress}]"
}
}
}
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
return builder
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.endpoint("$addressString:$publicationPort")
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.controlEndpoint("$addressString:$subscriptionPort")
.controlMode("dynamic")
if (logger.isTraceEnabled) {
if (address is Inet4Address) {
logger.trace("IPV4 client pub URI: ${publicationUri.build()}")
logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}")
} else {
logger.trace("IPV6 client pub URI: ${publicationUri.build()}")
logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}")
}
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
var success = false
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS)
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
throw ClientTimedOutException("Cannot create subscription!")
}
success = false
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
publication.close()
throw ClientTimedOutException("Creating publication connection to aeron")
}
this.success = true
this.publication = publication
this.subscription = subscription
}
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.controlEndpoint("$addressString:$publicationPort")
.controlMode("dynamic")
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.endpoint("$addressString:$subscriptionPort")
if (logger.isTraceEnabled) {
if (address is Inet4Address) {
logger.trace("IPV4 server pub URI: ${publicationUri.build()}")
logger.trace("IPV4 server sub URI: ${subscriptionUri.build()}")
} else {
logger.trace("IPV6 server pub URI: ${publicationUri.build()}")
logger.trace("IPV6 server sub URI: ${subscriptionUri.build()}")
}
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
}
override fun clientInfo(): String {
address!!
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
}
}
override fun serverInfo(): String {
val address = if (address == IPv4.WILDCARD || address == IPv6.WILDCARD) {
if (address == IPv4.WILDCARD) {
address.hostAddress
} else {
IPv4.WILDCARD.hostAddress + "/" + address.hostAddress
}
} else {
IP.toString(address!!)
}
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
}
}
override fun close() {
if (success) {
subscription.close()
publication.close()
}
}
override fun toString(): String {
return "$addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
}
}
/**
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast
*/
class IpcMediaDriverConnection(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int,
) :
MediaDriverConnection(null, 0, 0, streamId, sessionId, 1_000, true) {
var success: Boolean = false
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().media("ipc")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
return builder
}
/**
* Set up the subscription + publication channels to the server
*
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
if (logger.isTraceEnabled) {
logger.trace("IPC client pub URI: ${publicationUri.build()}")
logger.trace("IPC server sub URI: ${subscriptionUri.build()}")
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
var success = false
// this will wait for the server to acknowledge the connection (all via aeron)
var startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
if (subscription.isConnected && subscription.imageCount() > 0) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
throw ClientTimedOutException("Creating subscription connection to aeron")
}
success = false
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < connectionTimeoutMS) {
if (publication.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
publication.close()
throw ClientTimedOutException("Creating publication connection to aeron")
}
this.success = true
this.publication = publication
this.subscription = subscription
}
/**
* Setup the subscription + publication channels on the server
*/
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
if (logger.isTraceEnabled) {
logger.trace("IPC server pub URI: ${publicationUri.build()}")
logger.trace("IPC server sub URI: ${subscriptionUri.build()}")
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
}
override fun clientInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]"
} else {
"Connecting handshake to IPC [$streamIdSubscription|$streamId]"
}
}
override fun serverInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC listening on [$streamIdSubscription|$streamId] "
} else {
"Listening handshake on IPC [$streamIdSubscription|$streamId]"
}
}
override fun close() {
if (success) {
subscription.close()
publication.close()
}
}
override fun toString(): String {
return "[$streamIdSubscription|$streamId] [$sessionId]"
}
}

View File

@ -0,0 +1,177 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
import dorkbox.netUtil.IP
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.TimeUnit
/**
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
* A connection timeout of 0, means to wait forever
*/
internal class UdpMediaDriverClientConnection(val address: InetAddress,
publicationPort: Int,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
isReliable: Boolean = true) :
MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
var success: Boolean = false
private fun aeronConnectionString(ipAddress: InetAddress): String {
return if (ipAddress is Inet4Address) {
ipAddress.hostAddress
} else {
// IPv6 requires the address to be bracketed by [...]
val host = ipAddress.hostAddress
if (host[0] == '[') {
host
} else {
"[${ipAddress.hostAddress}]"
}
}
}
val addressString: String by lazy {
IP.toString(address)
}
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
return builder
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
val aeronAddressString = aeronConnectionString(address)
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.endpoint("$aeronAddressString:$publicationPort")
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.controlEndpoint("$aeronAddressString:$subscriptionPort")
.controlMode("dynamic")
if (logger.isTraceEnabled) {
if (address is Inet4Address) {
logger.trace("IPV4 client pub URI: ${publicationUri.build()}")
logger.trace("IPV4 client sub URI: ${subscriptionUri.build()}")
} else {
logger.trace("IPV6 client pub URI: ${publicationUri.build()}")
logger.trace("IPV6 client sub URI: ${subscriptionUri.build()}")
}
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
var success = false
// this will wait for the server to acknowledge the connection (all via aeron)
val timoutInNanos = TimeUnit.MILLISECONDS.toNanos(connectionTimeoutMS)
var startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (subscription.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
throw ClientTimedOutException("Cannot create subscription!")
}
success = false
// this will wait for the server to acknowledge the connection (all via aeron)
startTime = System.nanoTime()
while (timoutInNanos == 0L || System.nanoTime() - startTime < timoutInNanos) {
if (publication.isConnected) {
success = true
break
}
delay(timeMillis = 100L)
}
if (!success) {
subscription.close()
publication.close()
throw ClientTimedOutException("Creating publication connection to aeron")
}
this.success = true
this.publication = publication
this.subscription = subscription
}
override fun clientInfo(): String {
address
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
}
}
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
throw ClientException("Server info not implemented in Client MDC")
}
override fun serverInfo(): String {
throw ClientException("Server info not implemented in Client MDC")
}
override fun close() {
if (success) {
subscription.close()
publication.close()
}
}
override fun toString(): String {
return "$addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
import java.net.InetAddress
/**
* This represents the connection PAIR between a server<->client
* A connection timeout of 0, means to wait forever
*/
internal class UdpMediaDriverPairedConnection(listenAddress: InetAddress,
val remoteAddress: InetAddress,
val remoteAddressString: String,
publicationPort: Int,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
isReliable: Boolean = true) :
UdpMediaDriverServerConnection(listenAddress, publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
override fun toString(): String {
return "$remoteAddressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
}
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.exceptions.ServerException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import mu.KLogger
import java.net.Inet4Address
import java.net.InetAddress
/**
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
* A connection timeout of 0, means to wait forever
*/
internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddress,
publicationPort: Int,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutMS: Long = 0,
isReliable: Boolean = true) :
MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutMS, isReliable) {
var success: Boolean = false
private fun aeronConnectionString(ipAddress: InetAddress): String {
return if (ipAddress is Inet4Address) {
ipAddress.hostAddress
} else {
// IPv6 requires the address to be bracketed by [...]
val host = ipAddress.hostAddress
if (host[0] == '[') {
host
} else {
"[${ipAddress.hostAddress}]"
}
}
}
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
return builder
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
throw ServerException("Client info not implemented in Server MDC")
}
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
val connectionString = aeronConnectionString(listenAddress)
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.controlEndpoint("$connectionString:$publicationPort")
.controlMode("dynamic")
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.endpoint("$connectionString:$subscriptionPort")
if (logger.isTraceEnabled) {
if (listenAddress is Inet4Address) {
logger.trace("IPV4 server pub URI: ${publicationUri.build()}")
logger.trace("IPV4 server sub URI: ${subscriptionUri.build()}")
} else {
logger.trace("IPV6 server pub URI: ${publicationUri.build()}")
logger.trace("IPV6 server sub URI: ${subscriptionUri.build()}")
}
}
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
}
override fun clientInfo(): String {
throw ServerException("Client info not implemented in Server MDC")
}
override fun serverInfo(): String {
val address = if (listenAddress == IPv4.WILDCARD || listenAddress == IPv6.WILDCARD) {
if (listenAddress == IPv4.WILDCARD) {
listenAddress.hostAddress
} else {
IPv4.WILDCARD.hostAddress + "/" + listenAddress.hostAddress
}
} else {
IP.toString(listenAddress)
}
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
}
}
override fun close() {
if (success) {
subscription.close()
publication.close()
}
}
override fun toString(): String {
return "$IP.toString(listenAddress) [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
}
}

View File

@ -16,7 +16,9 @@
package dorkbox.network.connection
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.aeron.UdpMediaDriverServerConnection
import dorkbox.network.handshake.ConnectionCounts
import dorkbox.network.handshake.RandomIdAllocator
import dorkbox.network.ping.Ping
@ -83,7 +85,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
/**
* @return true if this connection is a network connection
*/
val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverConnection
val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverServerConnection
/**
* the endpoint associated with this connection
@ -144,24 +146,35 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
subscription = mediaDriverConnection.subscription
publication = mediaDriverConnection.publication
remoteAddress = mediaDriverConnection.address // this can be the IP address or "ipc" word
id = mediaDriverConnection.sessionId // NOTE: this is UNIQUE per server!
if (mediaDriverConnection is IpcMediaDriverConnection) {
streamId = 0 // this is because with IPC, we have stream sub/pub (which are replaced as port sub/pub)
subscriptionPort = mediaDriverConnection.streamIdSubscription
publicationPort = mediaDriverConnection.streamId
remoteAddress = null
remoteAddressString = "ipc"
toString0 = "[$id] IPC [$subscriptionPort|$publicationPort]"
} else {
mediaDriverConnection as UdpMediaDriverConnection
streamId = mediaDriverConnection.streamId // NOTE: this is UNIQUE per server!
subscriptionPort = mediaDriverConnection.subscriptionPort
publicationPort = mediaDriverConnection.publicationPort
remoteAddressString = mediaDriverConnection.addressString
when (mediaDriverConnection) {
is UdpMediaDriverClientConnection -> {
remoteAddress = mediaDriverConnection.address
remoteAddressString = mediaDriverConnection.addressString
}
is UdpMediaDriverPairedConnection -> {
remoteAddress = mediaDriverConnection.remoteAddress
remoteAddressString = mediaDriverConnection.remoteAddressString
}
else -> {
throw Exception("Invalid media driver connection type! : ${mediaDriverConnection::class.qualifiedName}")
}
}
toString0 = "[$id] $remoteAddressString [$publicationPort|$subscriptionPort]"
}