diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 48572a30..a05d10c4 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -20,7 +20,7 @@ import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.IpcMediaDriverConnection -import dorkbox.network.aeron.UdpMediaDriverConnection +import dorkbox.network.aeron.UdpMediaDriverClientConnection import dorkbox.network.connection.* import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.exceptions.ClientException @@ -249,7 +249,6 @@ open class Client(config: Configuration = Configuration this.remoteAddress0 = remoteAddress connection0 = null - // we are done with initial configuration, now initialize aeron and the general state of this endpoint val aeron = initEndpointState() @@ -302,26 +301,30 @@ open class Client(config: Configuration = Configuration logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" } // try a UDP connection instead - val udpConnection = UdpMediaDriverConnection(address = this.remoteAddress0!!, - publicationPort = config.subscriptionPort, - subscriptionPort = config.publicationPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = connectionTimeoutMS, - isReliable = reliable) + val udpConnection = UdpMediaDriverClientConnection( + address = this.remoteAddress0!!, + publicationPort = config.subscriptionPort, + subscriptionPort = config.publicationPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = connectionTimeoutMS, + isReliable = reliable) + // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports udpConnection.buildClient(aeron, logger) udpConnection } } else { - val test = UdpMediaDriverConnection(address = this.remoteAddress0!!, - publicationPort = config.subscriptionPort, - subscriptionPort = config.publicationPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = connectionTimeoutMS, - isReliable = reliable) + val test = UdpMediaDriverClientConnection( + address = this.remoteAddress0!!, + publicationPort = config.subscriptionPort, + subscriptionPort = config.publicationPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = connectionTimeoutMS, + isReliable = reliable) + // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports test.buildClient(aeron, logger) test @@ -358,31 +361,33 @@ open class Client(config: Configuration = Configuration // we are now connected, so we can connect to the NEW client-specific ports - val reliableClientConnection = if (isUsingIPC) { - IpcMediaDriverConnection(sessionId = connectionInfo.sessionId, - // NOTE: pub/sub must be switched! - streamIdSubscription = connectionInfo.publicationPort, - streamId = connectionInfo.subscriptionPort) + val clientConnection = if (isUsingIPC) { + IpcMediaDriverConnection( + sessionId = connectionInfo.sessionId, + // NOTE: pub/sub must be switched! + streamIdSubscription = connectionInfo.publicationPort, + streamId = connectionInfo.subscriptionPort) } else { - UdpMediaDriverConnection(address = handshakeConnection.address!!, - // NOTE: pub/sub must be switched! - publicationPort = connectionInfo.subscriptionPort, - subscriptionPort = connectionInfo.publicationPort, - streamId = connectionInfo.streamId, - sessionId = connectionInfo.sessionId, - connectionTimeoutMS = connectionTimeoutMS, - isReliable = handshakeConnection.isReliable) + UdpMediaDriverClientConnection( + address = (handshakeConnection as UdpMediaDriverClientConnection).address, + // NOTE: pub/sub must be switched! + publicationPort = connectionInfo.subscriptionPort, + subscriptionPort = connectionInfo.publicationPort, + streamId = connectionInfo.streamId, + sessionId = connectionInfo.sessionId, + connectionTimeoutMS = connectionTimeoutMS, + isReliable = handshakeConnection.isReliable) } // we have to construct how the connection will communicate! - reliableClientConnection.buildClient(aeron, logger) + clientConnection.buildClient(aeron, logger) // only the client connects to the server, so here we have to connect. The server (when creating the new "connection" object) // does not need to do anything // // throws a ConnectTimedOutException if the client cannot connect for any reason to the server-assigned client ports - logger.info(reliableClientConnection.clientInfo()) + logger.info(clientConnection.clientInfo()) /////////////// @@ -408,9 +413,9 @@ open class Client(config: Configuration = Configuration val newConnection: CONNECTION if (isUsingIPC) { - newConnection = newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID)) + newConnection = newConnection(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID)) } else { - newConnection = newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress)) + newConnection = newConnection(ConnectionParams(this, clientConnection, validateRemoteAddress)) remoteAddress!! diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 1e5b3cab..4e9473be 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -21,7 +21,7 @@ import dorkbox.netUtil.IPv6 import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.IpcMediaDriverConnection -import dorkbox.network.aeron.UdpMediaDriverConnection +import dorkbox.network.aeron.UdpMediaDriverServerConnection import dorkbox.network.connection.Connection import dorkbox.network.connection.EndPoint import dorkbox.network.connection.ListenerManager @@ -215,12 +215,13 @@ open class Server(config: ServerConfiguration = ServerC @Suppress("DuplicatedCode") private suspend fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { val poller = if (canUseIPv4) { - val driver = UdpMediaDriverConnection(address = listenIPv4Address!!, - publicationPort = config.publicationPort, - subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + val driver = UdpMediaDriverServerConnection( + listenAddress = listenIPv4Address!!, + publicationPort = config.publicationPort, + subscriptionPort = config.subscriptionPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) driver.buildServer(aeron, logger) val publication = driver.publication @@ -252,7 +253,7 @@ open class Server(config: ServerConfiguration = ServerC // val port = remoteIpAndPort.substring(splitPoint+1) // this should never be null, because we are feeding it a valid IP address from aeron - val clientAddress = IPv4.fromStringUnsafe(clientAddressString) + val clientAddress = IPv4.toAddressUnsafe(clientAddressString) val message = readHandshakeMessage(buffer, offset, length, header) @@ -296,12 +297,13 @@ open class Server(config: ServerConfiguration = ServerC @Suppress("DuplicatedCode") private suspend fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { val poller = if (canUseIPv6) { - val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, - publicationPort = config.publicationPort, - subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + val driver = UdpMediaDriverServerConnection( + listenAddress = listenIPv6Address!!, + publicationPort = config.publicationPort, + subscriptionPort = config.subscriptionPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) driver.buildServer(aeron, logger) val publication = driver.publication @@ -333,7 +335,7 @@ open class Server(config: ServerConfiguration = ServerC // val port = remoteIpAndPort.substring(splitPoint+1) // this should never be null, because we are feeding it a valid IP address from aeron - val clientAddress = IPv6.fromString(clientAddressString)!! + val clientAddress = IPv6.toAddress(clientAddressString)!! val message = readHandshakeMessage(buffer, offset, length, header) @@ -376,12 +378,13 @@ open class Server(config: ServerConfiguration = ServerC @Suppress("DuplicatedCode") private suspend fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { - val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, - publicationPort = config.publicationPort, - subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) + val driver = UdpMediaDriverServerConnection( + listenAddress = listenIPv6Address!!, + publicationPort = config.publicationPort, + subscriptionPort = config.subscriptionPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) driver.buildServer(aeron, logger) val publication = driver.publication @@ -414,7 +417,7 @@ open class Server(config: ServerConfiguration = ServerC // this should never be null, because we are feeding it a valid IP address from aeron // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is. - val clientAddress = IP.fromString(clientAddressString)!! + val clientAddress = IP.toAddress(clientAddressString)!! val message = readHandshakeMessage(buffer, offset, length, header) diff --git a/src/dorkbox/network/storage/types/PropertyStore.kt b/src/dorkbox/network/storage/types/PropertyStore.kt index d03acb4f..32340f64 100644 --- a/src/dorkbox/network/storage/types/PropertyStore.kt +++ b/src/dorkbox/network/storage/types/PropertyStore.kt @@ -77,7 +77,7 @@ class PropertyStore(val dbFile: File, val logger: KLogger): GenericStore { SettingsStore.saltKey -> loadedProps[SettingsStore.saltKey] = Sys.hexToBytes(value) SettingsStore.privateKey -> loadedProps[SettingsStore.privateKey] = Sys.hexToBytes(value) else -> { - val address: InetAddress? = IP.fromString(key) + val address: InetAddress? = IP.toAddress(key) if (address != null) { loadedProps[address] = Sys.hexToBytes(value) } else {