Code cleanup

This commit is contained in:
Robinson 2021-04-27 10:28:36 +02:00
parent 29b0ee8199
commit bb245e2f8e
3 changed files with 64 additions and 56 deletions

View File

@ -20,7 +20,7 @@ import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6 import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.connection.* import dorkbox.network.connection.*
import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
@ -249,7 +249,6 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
this.remoteAddress0 = remoteAddress this.remoteAddress0 = remoteAddress
connection0 = null connection0 = null
// we are done with initial configuration, now initialize aeron and the general state of this endpoint // we are done with initial configuration, now initialize aeron and the general state of this endpoint
val aeron = initEndpointState() val aeron = initEndpointState()
@ -302,26 +301,30 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" } logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" }
// try a UDP connection instead // try a UDP connection instead
val udpConnection = UdpMediaDriverConnection(address = this.remoteAddress0!!, val udpConnection = UdpMediaDriverClientConnection(
publicationPort = config.subscriptionPort, address = this.remoteAddress0!!,
subscriptionPort = config.publicationPort, publicationPort = config.subscriptionPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, subscriptionPort = config.publicationPort,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
connectionTimeoutMS = connectionTimeoutMS, sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
isReliable = reliable) connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.buildClient(aeron, logger) udpConnection.buildClient(aeron, logger)
udpConnection udpConnection
} }
} }
else { else {
val test = UdpMediaDriverConnection(address = this.remoteAddress0!!, val test = UdpMediaDriverClientConnection(
publicationPort = config.subscriptionPort, address = this.remoteAddress0!!,
subscriptionPort = config.publicationPort, publicationPort = config.subscriptionPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, subscriptionPort = config.publicationPort,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
connectionTimeoutMS = connectionTimeoutMS, sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
isReliable = reliable) connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeron, logger) test.buildClient(aeron, logger)
test test
@ -358,31 +361,33 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// we are now connected, so we can connect to the NEW client-specific ports // we are now connected, so we can connect to the NEW client-specific ports
val reliableClientConnection = if (isUsingIPC) { val clientConnection = if (isUsingIPC) {
IpcMediaDriverConnection(sessionId = connectionInfo.sessionId, IpcMediaDriverConnection(
// NOTE: pub/sub must be switched! sessionId = connectionInfo.sessionId,
streamIdSubscription = connectionInfo.publicationPort, // NOTE: pub/sub must be switched!
streamId = connectionInfo.subscriptionPort) streamIdSubscription = connectionInfo.publicationPort,
streamId = connectionInfo.subscriptionPort)
} }
else { else {
UdpMediaDriverConnection(address = handshakeConnection.address!!, UdpMediaDriverClientConnection(
// NOTE: pub/sub must be switched! address = (handshakeConnection as UdpMediaDriverClientConnection).address,
publicationPort = connectionInfo.subscriptionPort, // NOTE: pub/sub must be switched!
subscriptionPort = connectionInfo.publicationPort, publicationPort = connectionInfo.subscriptionPort,
streamId = connectionInfo.streamId, subscriptionPort = connectionInfo.publicationPort,
sessionId = connectionInfo.sessionId, streamId = connectionInfo.streamId,
connectionTimeoutMS = connectionTimeoutMS, sessionId = connectionInfo.sessionId,
isReliable = handshakeConnection.isReliable) connectionTimeoutMS = connectionTimeoutMS,
isReliable = handshakeConnection.isReliable)
} }
// we have to construct how the connection will communicate! // we have to construct how the connection will communicate!
reliableClientConnection.buildClient(aeron, logger) clientConnection.buildClient(aeron, logger)
// only the client connects to the server, so here we have to connect. The server (when creating the new "connection" object) // only the client connects to the server, so here we have to connect. The server (when creating the new "connection" object)
// does not need to do anything // does not need to do anything
// //
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server-assigned client ports // throws a ConnectTimedOutException if the client cannot connect for any reason to the server-assigned client ports
logger.info(reliableClientConnection.clientInfo()) logger.info(clientConnection.clientInfo())
/////////////// ///////////////
@ -408,9 +413,9 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val newConnection: CONNECTION val newConnection: CONNECTION
if (isUsingIPC) { if (isUsingIPC) {
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID)) newConnection = newConnection(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID))
} else { } else {
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress)) newConnection = newConnection(ConnectionParams(this, clientConnection, validateRemoteAddress))
remoteAddress!! remoteAddress!!

View File

@ -21,7 +21,7 @@ import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverServerConnection
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager
@ -215,12 +215,13 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
private suspend fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { private suspend fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
val poller = if (canUseIPv4) { val poller = if (canUseIPv4) {
val driver = UdpMediaDriverConnection(address = listenIPv4Address!!, val driver = UdpMediaDriverServerConnection(
publicationPort = config.publicationPort, listenAddress = listenIPv4Address!!,
subscriptionPort = config.subscriptionPort, publicationPort = config.publicationPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, subscriptionPort = config.subscriptionPort,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -252,7 +253,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// val port = remoteIpAndPort.substring(splitPoint+1) // val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron // this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IPv4.fromStringUnsafe(clientAddressString) val clientAddress = IPv4.toAddressUnsafe(clientAddressString)
val message = readHandshakeMessage(buffer, offset, length, header) val message = readHandshakeMessage(buffer, offset, length, header)
@ -296,12 +297,13 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
private suspend fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { private suspend fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
val poller = if (canUseIPv6) { val poller = if (canUseIPv6) {
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, val driver = UdpMediaDriverServerConnection(
publicationPort = config.publicationPort, listenAddress = listenIPv6Address!!,
subscriptionPort = config.subscriptionPort, publicationPort = config.publicationPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, subscriptionPort = config.subscriptionPort,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -333,7 +335,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// val port = remoteIpAndPort.substring(splitPoint+1) // val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron // this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IPv6.fromString(clientAddressString)!! val clientAddress = IPv6.toAddress(clientAddressString)!!
val message = readHandshakeMessage(buffer, offset, length, header) val message = readHandshakeMessage(buffer, offset, length, header)
@ -376,12 +378,13 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
private suspend fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { private suspend fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, val driver = UdpMediaDriverServerConnection(
publicationPort = config.publicationPort, listenAddress = listenIPv6Address!!,
subscriptionPort = config.subscriptionPort, publicationPort = config.publicationPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, subscriptionPort = config.subscriptionPort,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -414,7 +417,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// this should never be null, because we are feeding it a valid IP address from aeron // this should never be null, because we are feeding it a valid IP address from aeron
// maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is. // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is.
val clientAddress = IP.fromString(clientAddressString)!! val clientAddress = IP.toAddress(clientAddressString)!!
val message = readHandshakeMessage(buffer, offset, length, header) val message = readHandshakeMessage(buffer, offset, length, header)

View File

@ -77,7 +77,7 @@ class PropertyStore(val dbFile: File, val logger: KLogger): GenericStore {
SettingsStore.saltKey -> loadedProps[SettingsStore.saltKey] = Sys.hexToBytes(value) SettingsStore.saltKey -> loadedProps[SettingsStore.saltKey] = Sys.hexToBytes(value)
SettingsStore.privateKey -> loadedProps[SettingsStore.privateKey] = Sys.hexToBytes(value) SettingsStore.privateKey -> loadedProps[SettingsStore.privateKey] = Sys.hexToBytes(value)
else -> { else -> {
val address: InetAddress? = IP.fromString(key) val address: InetAddress? = IP.toAddress(key)
if (address != null) { if (address != null) {
loadedProps[address] = Sys.hexToBytes(value) loadedProps[address] = Sys.hexToBytes(value)
} else { } else {